refactor: 重构项目结构,将geo_tools重命名为app并更新相关引用

- 将主包名从geo_tools改为app
- 更新所有模块中的引用路径
- 迁移并更新测试用例
- 添加项目规则文档
- 保持原有功能不变,仅进行结构调整
This commit is contained in:
2026-04-12 19:49:56 +08:00
parent fcb8e1f255
commit db51d41aef
41 changed files with 4132 additions and 808 deletions

1
app/core/__init__.py Normal file
View File

@@ -0,0 +1 @@
"""geo_tools.core 包 —— 核心地理处理层。"""

469
app/core/geometry.py Normal file
View File

@@ -0,0 +1,469 @@
"""
geo_tools.core.geometry
~~~~~~~~~~~~~~~~~~~~~~~
基于 Shapely 2.x 的几何运算工具函数。
"""
from __future__ import annotations
from typing import Literal, Sequence
import shapely
from shapely.geometry import (
LinearRing,
LineString,
MultiLineString,
MultiPoint,
MultiPolygon,
Point,
Polygon,
)
from shapely.geometry.base import BaseGeometry
from app.utils.logger import get_logger
from app.utils.validators import ensure_valid_geometry
logger = get_logger(__name__)
# ── 几何有效性 ────────────────────────────────────────────────────────────────
def is_valid_geometry(geom: BaseGeometry | None) -> bool:
"""判断几何对象是否有效(非空且通过 Shapely 合法性检查)。
Parameters
----------
geom:
输入几何对象,可为 None。
Returns
-------
bool
如果几何对象有效且非空,返回 True否则返回 False。
"""
if geom is None:
return False
return bool(geom.is_valid and not geom.is_empty)
def fix_geometry(geom: BaseGeometry | None) -> BaseGeometry | None:
"""尝试修复无效几何。
依次尝试:
1. ``buffer(0)`` — 适合大多数自相交多边形
2. ``make_valid``Shapely 2.x— 覆盖更多情形
Parameters
----------
geom:
输入几何对象,可为 None。
Returns
-------
BaseGeometry | None
修复后的几何;无法修复时返回 ``None``。
Notes
-----
对于复杂的无效几何,可能无法完全修复,此时会返回 None。
"""
if geom is None:
return None
if geom.is_valid:
return geom
# 方法一buffer(0)
try:
fixed = geom.buffer(0)
if fixed.is_valid and not fixed.is_empty:
return fixed
except Exception:
pass
# 方法二shapely.make_validShapely >= 1.8
try:
fixed = shapely.make_valid(geom)
if fixed.is_valid and not fixed.is_empty:
return fixed
except Exception:
pass
logger.warning("无法修复几何:%r", geom.geom_type)
return None
def explain_validity(geom: BaseGeometry) -> str:
"""返回 Shapely 对该几何的有效性说明(英文)。
Parameters
----------
geom:
输入几何对象。
Returns
-------
str
Shapely 生成的有效性说明字符串。
"""
from shapely.validation import explain_validity as _explain
return _explain(geom)
# ── 基础几何运算 ───────────────────────────────────────────────────────────────
def buffer_geometry(
geom: BaseGeometry,
distance: float,
cap_style: Literal["round", "square", "flat"] = "round",
join_style: Literal["round", "mitre", "bevel"] = "round",
resolution: int = 16,
verbose: bool = False,
) -> BaseGeometry | None:
"""对几何对象执行缓冲区运算。
Parameters
----------
geom:
输入几何。
distance:
缓冲距离(单位与 CRS 一致;地理坐标系单位为度)。
cap_style:
端头样式:"round"(圆角)、"square"(方角)、"flat"(平角)(仅线要素有效)。
join_style:
转角样式:"round"(圆角)、"mitre"(斜角)、"bevel"(尖角)。
resolution:
圆弧逼近精度(段数),默认 16。
verbose:
是否输出修复几何的警告信息,默认 False。
Returns
-------
BaseGeometry | None
缓冲区运算后的几何对象;失败时返回 None。
"""
try:
geom = ensure_valid_geometry(geom, verbose)
return geom.buffer(distance, cap_style=cap_style, join_style=join_style, resolution=resolution)
except Exception as e:
logger.warning(f"缓冲区计算失败,已跳过,原因:{str(e)}")
return None
def centroid(geom: BaseGeometry, verbose: bool = False) -> Point | None:
"""返回几何的质心点。
Parameters
----------
geom:
输入几何。
verbose:
是否输出修复几何的警告信息,默认 False。
Returns
-------
Point | None
几何的质心点;失败时返回 None。
"""
try:
geom = ensure_valid_geometry(geom, verbose)
return geom.centroid
except Exception as e:
logger.warning(f"质心计算失败,已跳过,原因:{str(e)}")
return None
def bounding_box(geom: BaseGeometry, verbose: bool = False) -> Polygon | None:
"""返回几何的最小外接矩形BBOX为多边形。
Parameters
----------
geom:
输入几何。
verbose:
是否输出修复几何的警告信息,默认 False。
Returns
-------
Polygon | None
最小外接矩形多边形;失败时返回 None。
"""
try:
geom = ensure_valid_geometry(geom, verbose)
from shapely.geometry import box
return box(*geom.bounds)
except Exception as e:
logger.warning(f"最小外接矩形计算失败,已跳过,原因:{str(e)}")
return None
def convex_hull(geom: BaseGeometry, verbose: bool = False) -> BaseGeometry | None:
"""返回几何的凸包。
Parameters
----------
geom:
输入几何。
verbose:
是否输出修复几何的警告信息,默认 False。
Returns
-------
BaseGeometry | None
几何的凸包;失败时返回 None。
"""
try:
geom = ensure_valid_geometry(geom, verbose)
return geom.convex_hull
except Exception as e:
logger.warning(f"凸包计算失败,已跳过,原因:{str(e)}")
return None
# ── 集合运算 ──────────────────────────────────────────────────────────────────
def intersect(geom_a: BaseGeometry, geom_b: BaseGeometry, verbose: bool = False) -> BaseGeometry | None:
"""返回两几何的交集。
Parameters
----------
geom_a:
第一个几何。
geom_b:
第二个几何。
verbose:
是否输出修复几何的警告信息,默认 False。
Returns
-------
BaseGeometry | None
两几何的交集;失败时返回 None。
"""
try:
geom_a = ensure_valid_geometry(geom_a, verbose)
geom_b = ensure_valid_geometry(geom_b, verbose)
return geom_a.intersection(geom_b)
except Exception as e:
logger.warning(f"交集计算失败,已跳过,原因:{str(e)}")
return None
def union(geom_a: BaseGeometry, geom_b: BaseGeometry, verbose: bool = False) -> BaseGeometry | None:
"""返回两几何的并集。
Parameters
----------
geom_a:
第一个几何。
geom_b:
第二个几何。
verbose:
是否输出修复几何的警告信息,默认 False。
Returns
-------
BaseGeometry | None
两几何的并集;失败时返回 None。
"""
try:
geom_a = ensure_valid_geometry(geom_a, verbose)
geom_b = ensure_valid_geometry(geom_b, verbose)
return geom_a.union(geom_b)
except Exception as e:
logger.warning(f"并集计算失败,已跳过,原因:{str(e)}")
return None
def difference(geom_a: BaseGeometry, geom_b: BaseGeometry, verbose: bool = False) -> BaseGeometry | None:
"""返回 ``geom_a`` 减去 ``geom_b`` 的差集。
Parameters
----------
geom_a:
第一个几何。
geom_b:
第二个几何。
verbose:
是否输出修复几何的警告信息,默认 False。
Returns
-------
BaseGeometry | None
geom_a 减去 geom_b 的差集;失败时返回 None。
"""
try:
geom_a = ensure_valid_geometry(geom_a, verbose)
geom_b = ensure_valid_geometry(geom_b, verbose)
return geom_a.difference(geom_b)
except Exception as e:
logger.warning(f"差集计算失败,已跳过,原因:{str(e)}")
return None
def symmetric_difference(geom_a: BaseGeometry, geom_b: BaseGeometry, verbose: bool = False) -> BaseGeometry | None:
"""返回两几何的对称差集(异或)。
Parameters
----------
geom_a:
第一个几何。
geom_b:
第二个几何。
verbose:
是否输出修复几何的警告信息,默认 False。
Returns
-------
BaseGeometry | None
两几何的对称差集;失败时返回 None。
"""
try:
geom_a = ensure_valid_geometry(geom_a, verbose)
geom_b = ensure_valid_geometry(geom_b, verbose)
return geom_a.symmetric_difference(geom_b)
except Exception as e:
logger.warning(f"对称差集计算失败,已跳过,原因:{str(e)}")
return None
def unary_union(geoms: Sequence[BaseGeometry], verbose: bool = False) -> BaseGeometry | None:
"""将多个几何合并为一个(等同于逐一 union
Parameters
----------
geoms:
几何对象序列。
verbose:
是否输出修复几何的警告信息,默认 False。
Returns
-------
BaseGeometry | None
合并后的几何对象;失败时返回 None。
"""
try:
# 检查几何有效性并尝试修复
fixed_geoms = []
for i, geom in enumerate(geoms):
try:
fixed = ensure_valid_geometry(geom, verbose)
if fixed is not None:
fixed_geoms.append(fixed)
else:
# 无法修复的几何跳过
if verbose:
logger.warning(f"几何对象 {i} 无效且无法修复,已跳过")
except Exception as e:
logger.warning(f"几何对象 {i} 处理失败,已跳过,原因:{str(e)}")
if not fixed_geoms:
logger.warning("没有有效的几何对象可合并")
return None
return shapely.unary_union(fixed_geoms)
except Exception as e:
logger.warning(f"几何合并失败,已跳过,原因:{str(e)}")
return None
# ── 空间关系判断 ───────────────────────────────────────────────────────────────
def contains(geom_a: BaseGeometry, geom_b: BaseGeometry, verbose: bool = False) -> bool:
"""判断 ``geom_a`` 是否完全包含 ``geom_b``。
Parameters
----------
geom_a:
第一个几何。
geom_b:
第二个几何。
verbose:
是否输出修复几何的警告信息,默认 False。
Returns
-------
bool
如果 geom_a 完全包含 geom_b返回 True否则返回 False失败时返回 False。
"""
try:
geom_a = ensure_valid_geometry(geom_a, verbose)
geom_b = ensure_valid_geometry(geom_b, verbose)
return bool(geom_a.contains(geom_b))
except Exception as e:
logger.warning(f"包含关系判断失败,已返回 False原因{str(e)}")
return False
def within(geom_a: BaseGeometry, geom_b: BaseGeometry, verbose: bool = False) -> bool:
"""判断 ``geom_a`` 是否完全在 ``geom_b`` 内。
Parameters
----------
geom_a:
第一个几何。
geom_b:
第二个几何。
verbose:
是否输出修复几何的警告信息,默认 False。
Returns
-------
bool
如果 geom_a 完全在 geom_b 内,返回 True否则返回 False失败时返回 False。
"""
try:
geom_a = ensure_valid_geometry(geom_a, verbose)
geom_b = ensure_valid_geometry(geom_b, verbose)
return bool(geom_a.within(geom_b))
except Exception as e:
logger.warning(f"包含于关系判断失败,已返回 False原因{str(e)}")
return False
def intersects(geom_a: BaseGeometry, geom_b: BaseGeometry, verbose: bool = False) -> bool:
"""判断两几何是否相交(含边界接触)。
Parameters
----------
geom_a:
第一个几何。
geom_b:
第二个几何。
verbose:
是否输出修复几何的警告信息,默认 False。
Returns
-------
bool
如果两几何相交,返回 True否则返回 False失败时返回 False。
"""
try:
geom_a = ensure_valid_geometry(geom_a, verbose)
geom_b = ensure_valid_geometry(geom_b, verbose)
return bool(geom_a.intersects(geom_b))
except Exception as e:
logger.warning(f"相交关系判断失败,已返回 False原因{str(e)}")
return False
def distance_between(geom_a: BaseGeometry, geom_b: BaseGeometry, verbose: bool = False) -> float:
"""计算两几何间的最小距离(单位与 CRS 一致)。
Parameters
----------
geom_a:
第一个几何。
geom_b:
第二个几何。
verbose:
是否输出修复几何的警告信息,默认 False。
Returns
-------
float
两几何间的最小距离;失败时返回无穷大。
"""
try:
geom_a = ensure_valid_geometry(geom_a, verbose)
geom_b = ensure_valid_geometry(geom_b, verbose)
return geom_a.distance(geom_b)
except Exception as e:
logger.warning(f"距离计算失败,已返回无穷大,原因:{str(e)}")
return float('inf')

218
app/core/projection.py Normal file
View File

@@ -0,0 +1,218 @@
"""
geo_tools.core.projection
~~~~~~~~~~~~~~~~~~~~~~~~~
坐标系与投影转换工具,基于 pyproj。
"""
from __future__ import annotations
from typing import Sequence
from pyproj import CRS, Transformer
import geopandas as gpd
from app.utils.logger import get_logger
logger = get_logger(__name__)
def get_crs_info(crs_input: str | int) -> dict[str, str | int | None]:
"""获取 CRS 的基本信息。
Parameters
----------
crs_input:
EPSG 代码(整数或 ``"EPSG:4326"`` 字符串)或 proj 字符串。
Returns
-------
dict
包含 ``name``、``epsg``、``unit``、``is_geographic``、``is_projected``、``datum``。
"""
crs = CRS.from_user_input(crs_input)
return {
"name": crs.name,
"epsg": crs.to_epsg(),
"unit": str(crs.axis_info[0].unit_name) if crs.axis_info else None,
"is_geographic": crs.is_geographic,
"is_projected": crs.is_projected,
"datum": crs.datum.name if crs.datum else None,
}
def crs_to_epsg(crs_input: str | int) -> int | None:
"""尝试将 CRS 转为 EPSG 整数编号,无法识别时返回 None。"""
try:
return CRS.from_user_input(crs_input).to_epsg()
except Exception:
return None
def transform_coordinates(
xs: Sequence[float],
ys: Sequence[float],
source_crs: str | int,
target_crs: str | int,
*,
always_xy: bool = True,
) -> tuple[list[float], list[float]]:
"""批量转换坐标点。
Parameters
----------
xs:
X 坐标序列(地理 CRS 时为经度)。
ys:
Y 坐标序列(地理 CRS 时为纬度)。
source_crs:
源 CRS。
target_crs:
目标 CRS。
always_xy:
强制以 (X, Y) 顺序输入输出(推荐保持 True
Returns
-------
(list[float], list[float])
转换后的 (xs, ys)。
"""
transformer = Transformer.from_crs(source_crs, target_crs, always_xy=always_xy)
result_xs, result_ys = transformer.transform(list(xs), list(ys))
return list(result_xs), list(result_ys)
def transform_point(
x: float,
y: float,
source_crs: str | int,
target_crs: str | int,
*,
always_xy: bool = True,
) -> tuple[float, float]:
"""转换单个坐标点。"""
xs, ys = transform_coordinates([x], [y], source_crs, target_crs, always_xy=always_xy)
return xs[0], ys[0]
def suggest_projected_crs(lon: float, lat: float, use_3degree: bool = True) -> str:
"""根据经纬度范围自动推荐适合面积/距离计算的投影 CRSCGCS2000 高斯-克吕格 带号)。
Parameters
----------
lon:
中心经度CGCS2000
lat:
中心纬度CGCS2000
use_3degree:
True 表示3度分带False 表示6度分带。
Returns
-------
str
EPSG 代码字符串,如 ``"EPSG:32650"``CGCS2000 高斯-克吕格 带号)。
"""
if use_3degree:
# 3度分带计算中央经线 = 3° * n
central_meridian = round(lon / 3) * 3
zone_number = int(central_meridian / 3)
# CGCS2000 3度带投影定义
# 从第25带到45带75°E-135°E
if 75 <= central_meridian <= 135:
epsg = 4513 + zone_number - 25
else:
# 默认使用36带108°E
epsg = 4524
logger.warning("经度范围超出3度带范围默认使用36带108°E")
else:
# 6度分带计算中央经线 = 6° * n - 3°
central_meridian = round((lon + 3) / 6) * 6 - 3
zone_number = int((central_meridian + 3) / 6)
# CGCS2000 6度带投影定义
# 从第13带到23带75°E-135°E
if 75 <= central_meridian <= 135:
epsg = 4491 + zone_number - 13
else:
# 默认使用18带105°E
epsg = 4496
logger.warning("经度范围超出6度带范围默认使用18带105°E")
logger.debug("建议投影 CRSEPSG:%dlon=%.2f, lat=%.2f", epsg, lon, lat)
return f"EPSG:{epsg}"
def reproject_gdf(
gdf: gpd.GeoDataFrame,
target_crs: str | int | None = None,
*,
auto_crs: bool = False,
verbose: bool = True,
) -> gpd.GeoDataFrame:
"""将 GeoDataFrame要素类重投影到目标坐标系。
Parameters
----------
gdf:
输入 GeoDataFrame必须已定义 CRS。
target_crs:
目标 CRS如 ``"EPSG:4326"``、``"EPSG:4490"`` 或整数 ``4523``。
与 ``auto_crs=True`` 二选一。
auto_crs:
为 ``True`` 时忽略 ``target_crs``,根据数据中心点自动推荐
CGCS2000 高斯-克吕格 带号(默认使用 3度分带
verbose:
为 ``True`` 时在日志中打印投影前后的 CRS 信息。
Returns
-------
gpd.GeoDataFrame
重投影后的新 GeoDataFrame原始对象不变
Raises
------
ValueError
``gdf`` 未定义 CRS或 ``target_crs`` 与 ``auto_crs`` 均未指定。
Examples
--------
>>> # 指定目标 CRS
>>> gdf_proj = reproject_gdf(gdf, "EPSG:4490")
>>> # 自动推荐 CGCS2000 高斯-克吕格 带号 默认使用 3度分带
>>> gdf_utm = reproject_gdf(gdf, auto_crs=True)
>>> # 配合 GDB 读取
>>> gdf = geo_tools.readers.read_vector("data.gdb/图斑")
>>> gdf_proj = reproject_gdf(gdf, "EPSG:4326")
"""
if gdf.crs is None:
raise ValueError("GeoDataFrame 未定义 CRS请先调用 set_crs() 设置坐标系。")
if auto_crs:
# 先统一到地理坐标系,再取中心点推荐 CGCS2000 高斯-克吕格 带号
if gdf.crs.is_projected:
center = gdf.to_crs("EPSG:4490").geometry.union_all().centroid
else:
center = gdf.geometry.union_all().centroid
target_crs = suggest_projected_crs(center.x, center.y)
logger.info("auto_crs自动推荐投影 CRS = %s", target_crs)
if target_crs is None:
raise ValueError("请指定 target_crs或设置 auto_crs=True 自动推荐投影。")
src_crs_str = gdf.crs.to_string()
result = gdf.to_crs(target_crs)
if verbose:
tgt_info = get_crs_info(target_crs)
logger.info(
"要素类重投影完成:%s%s%s,单位:%s,要素数:%d",
src_crs_str,
tgt_info.get("epsg") or target_crs,
tgt_info.get("name"),
tgt_info.get("unit"),
len(result),
)
return result

103
app/core/raster.py Normal file
View File

@@ -0,0 +1,103 @@
"""
geo_tools.core.raster
~~~~~~~~~~~~~~~~~~~~~
栅格数据处理预留接口。
当前提供基于 rasterio 的核心读写骨架;
如需完整栅格分析功能,请安装可选依赖:
``pip install geo-tools[raster]``
"""
from __future__ import annotations
from pathlib import Path
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
import numpy as np
def _require_rasterio() -> Any:
"""检查 rasterio 是否可用,不可用时给出明确提示。"""
try:
import rasterio
return rasterio
except ImportError as exc:
raise ImportError(
"栅格处理功能需要 rasterio。\n"
"请执行pip install geo-tools[raster] 或 pip install rasterio"
) from exc
def read_raster(
path: str | Path,
band: int = 1,
) -> tuple["np.ndarray", dict[str, Any]]:
"""读取栅格文件(单波段)。
Parameters
----------
path:
GeoTIFF 或其他 GDAL 支持格式的路径。
band:
波段号1-indexed。
Returns
-------
(np.ndarray, dict)
栅格数组 和 rasterio 元数据字典(``meta``)。
"""
rasterio = _require_rasterio()
with rasterio.open(str(path)) as src:
data = src.read(band)
meta = src.meta.copy()
return data, meta
def write_raster(
data: "np.ndarray",
path: str | Path,
meta: dict[str, Any],
band: int = 1,
) -> Path:
"""将 numpy 数组写出为 GeoTIFF。
Parameters
----------
data:
2D numpy 数组(单波段)。
path:
输出路径(.tif
meta:
rasterio 元数据字典(从 ``read_raster`` 获取或自行构造)。
band:
写入的波段号1-indexed。
Returns
-------
Path
"""
rasterio = _require_rasterio()
path = Path(path)
path.parent.mkdir(parents=True, exist_ok=True)
meta.update({"count": 1, "dtype": str(data.dtype)})
with rasterio.open(str(path), "w", **meta) as dst:
dst.write(data, band)
return path
def get_raster_info(path: str | Path) -> dict[str, Any]:
"""获取栅格文件的基本元信息行列数、波段数、CRS、分辨率等"""
rasterio = _require_rasterio()
with rasterio.open(str(path)) as src:
return {
"width": src.width,
"height": src.height,
"count": src.count,
"dtype": src.dtypes[0],
"crs": str(src.crs),
"transform": src.transform,
"bounds": src.bounds,
"nodata": src.nodata,
"res": src.res,
}

215
app/core/vector.py Normal file
View File

@@ -0,0 +1,215 @@
"""
geo_tools.core.vector
~~~~~~~~~~~~~~~~~~~~~
基于 geopandas 的矢量要素处理函数。
"""
from __future__ import annotations
from typing import Any, Literal
import geopandas as gpd
import pandas as pd
from app.utils.logger import get_logger
from app.utils.validators import validate_geometry
logger = get_logger(__name__)
def reproject(gdf: gpd.GeoDataFrame, target_crs: str | int) -> gpd.GeoDataFrame:
"""将 GeoDataFrame 重投影到目标坐标系。
Parameters
----------
gdf:
输入 GeoDataFrame必须已定义 CRS。
target_crs:
目标 CRS如 ``"EPSG:3857"`` 或 ``4490``。
Returns
-------
gpd.GeoDataFrame
重投影后的 GeoDataFrame新对象原始不变
"""
if gdf.crs is None:
raise ValueError("GeoDataFrame 未定义 CRS请先设置坐标系。")
if gdf.crs.to_epsg() == (target_crs if isinstance(target_crs, int) else None):
return gdf # 已经是目标 CRS跳过
logger.debug("重投影:%s%s(共 %d 条)", gdf.crs, target_crs, len(gdf))
return gdf.to_crs(target_crs)
def set_crs(gdf: gpd.GeoDataFrame, crs: str | int, *, overwrite: bool = False) -> gpd.GeoDataFrame:
"""为没有 CRS 的 GeoDataFrame 设置坐标系(不重投影)。
Parameters
----------
gdf:
输入数据。
crs:
目标 CRS。
overwrite:
若为 ``True``,即使已有 CRS 也强制覆盖(危险操作,请确认坐标系正确)。
"""
if gdf.crs is not None and not overwrite:
raise ValueError(
f"GeoDataFrame 已有 CRS{gdf.crs}。若要覆盖,请传入 overwrite=True。"
)
return gdf.set_crs(crs, allow_override=overwrite)
def clip_to_extent(
gdf: gpd.GeoDataFrame,
bbox: tuple[float, float, float, float] | gpd.GeoDataFrame,
) -> gpd.GeoDataFrame:
"""按矩形范围或另一个 GeoDataFrame 裁切要素。
Parameters
----------
gdf:
待裁切的 GeoDataFrame。
bbox:
矩形范围 ``(minx, miny, maxx, maxy)`` 或用于裁切的 GeoDataFrame / GeoSeries。
Returns
-------
gpd.GeoDataFrame
"""
if isinstance(bbox, tuple):
from shapely.geometry import box as shapely_box
mask = shapely_box(*bbox)
result = gdf.clip(mask)
else:
if bbox.crs != gdf.crs:
bbox = bbox.to_crs(gdf.crs) # type: ignore
result = gdf.clip(bbox)
logger.debug("裁切完成:%d%d", len(gdf), len(result))
return result
def dissolve_by(
gdf: gpd.GeoDataFrame,
by: str | list[str],
aggfunc: str | dict[str, Any] = "first",
) -> gpd.GeoDataFrame:
"""按属性字段融合Dissolve几何要素。
Parameters
----------
gdf:
输入 GeoDataFrame。
by:
融合字段名或字段列表。
aggfunc:
属性聚合函数,参考 ``pd.DataFrame.groupby``。
Returns
-------
gpd.GeoDataFrame
融合后的 GeoDataFrame索引为 ``by`` 字段。
"""
logger.debug("按字段 %r 融合要素(%d 条 → ?", by, len(gdf))
result = gdf.dissolve(by=by, aggfunc=aggfunc).reset_index()
logger.debug("融合完成:%d", len(result))
return result
def explode_multipart(gdf: gpd.GeoDataFrame) -> gpd.GeoDataFrame:
"""将多部分几何MultiPolygon 等)拆分为单部分要素。
Parameters
----------
gdf:
输入 GeoDataFrame。
Returns
-------
gpd.GeoDataFrame
拆分后索引已 reset。
"""
result = gdf.explode(index_parts=False).reset_index(drop=True)
logger.debug("多部分拆分:%d%d", len(gdf), len(result))
return result
def drop_invalid_geometries(gdf: gpd.GeoDataFrame, *, fix: bool = False) -> gpd.GeoDataFrame:
"""删除或修复无效几何。
Parameters
----------
gdf:
输入 GeoDataFrame。
fix:
若为 ``True``,尝试通过 ``buffer(0)`` 修复无效几何而非删除。
"""
stats = validate_geometry(gdf)
if stats["invalid"] == 0 and stats["null"] == 0:
return gdf
if fix:
from app.core.geometry import fix_geometry
gdf = gdf.copy()
mask = ~gdf.geometry.is_valid | gdf.geometry.isna()
gdf.loc[mask, "geometry"] = gdf.loc[mask, "geometry"].apply(fix_geometry) # type: ignore
logger.info("已修复 %d 个无效几何", stats["invalid"])
else:
before = len(gdf)
gdf = gdf[gdf.geometry.is_valid & gdf.geometry.notna()].copy()
logger.info("已删除 %d 个无效/空几何", before - len(gdf))
return gdf
def spatial_join(
left: gpd.GeoDataFrame,
right: gpd.GeoDataFrame,
how: Literal["left", "right", "inner"] = "left",
predicate: str = "intersects",
**kwargs: Any,
) -> gpd.GeoDataFrame:
"""空间连接(封装 geopandas.sjoin
Parameters
----------
left:
左侧 GeoDataFrame。
right:
右侧 GeoDataFrame。
how:
连接方式:``"left"``、``"right"``、``"inner"``。
predicate:
空间谓词:``"intersects"``、``"contains"``、``"within"``、``"touches"``。
"""
if left.crs != right.crs:
right = right.to_crs(left.crs) # type: ignore
result = gpd.sjoin(left, right, how=how, predicate=predicate, **kwargs)
logger.debug("空间连接完成:%d 条结果", len(result))
return result
def add_area_column(
gdf: gpd.GeoDataFrame,
col_name: str = "area_m2",
projected_crs: str = "EPSG:3857",
) -> gpd.GeoDataFrame:
"""添加面积列(单位:平方米)。
将数据临时投影到 ``projected_crs``(笛卡尔投影)计算面积后回填到原 GDF。
Parameters
----------
gdf:
输入 GeoDataFrame面要素
col_name:
新列名。
projected_crs:
用于面积计算的投影 CRS需为平面坐标系
"""
gdf = gdf.copy()
if gdf.crs is None or not gdf.crs.is_projected:
projected = gdf.to_crs(projected_crs)
else:
projected = gdf
gdf[col_name] = projected.geometry.area
return gdf