221 lines
6.8 KiB
Python
221 lines
6.8 KiB
Python
"""
|
||
geo_tools.core.projection
|
||
~~~~~~~~~~~~~~~~~~~~~~~~~
|
||
坐标系与投影转换工具,基于 pyproj。
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
from typing import Sequence
|
||
from pyproj import CRS, Transformer
|
||
import geopandas as gpd
|
||
|
||
from geo_tools.utils.logger import get_logger
|
||
|
||
logger = get_logger(__name__)
|
||
|
||
# ── 常用 CRS 快捷常量 ──────────────────────────────────────────────────────────
|
||
WGS84 = "EPSG:4326" # 地理坐标系(经纬度)
|
||
CGCS2000 = "EPSG:4490" # 中国国家大地坐标系 2000
|
||
WEB_MERCATOR = "EPSG:3857" # Web Mercator(米)
|
||
CGCS2000_UTM_50N = "EPSG:4508" # CGCS2000 / 3-degree Gauss-Kruger zone 50N
|
||
|
||
|
||
def get_crs_info(crs_input: str | int) -> dict[str, str | int | None]:
|
||
"""获取 CRS 的基本信息。
|
||
|
||
Returns
|
||
-------
|
||
dict
|
||
包含 ``name``、``epsg``、``unit``、``is_geographic``、``is_projected``。
|
||
"""
|
||
crs = CRS.from_user_input(crs_input)
|
||
return {
|
||
"name": crs.name,
|
||
"epsg": crs.to_epsg(),
|
||
"unit": str(crs.axis_info[0].unit_name) if crs.axis_info else None,
|
||
"is_geographic": crs.is_geographic,
|
||
"is_projected": crs.is_projected,
|
||
"datum": crs.datum.name if crs.datum else None,
|
||
}
|
||
|
||
|
||
def crs_to_epsg(crs_input: str | int) -> int | None:
|
||
"""尝试将 CRS 转为 EPSG 整数编号,无法识别时返回 None。"""
|
||
try:
|
||
return CRS.from_user_input(crs_input).to_epsg()
|
||
except Exception:
|
||
return None
|
||
|
||
|
||
def transform_coordinates(
|
||
xs: Sequence[float],
|
||
ys: Sequence[float],
|
||
source_crs: str | int,
|
||
target_crs: str | int,
|
||
*,
|
||
always_xy: bool = True,
|
||
) -> tuple[list[float], list[float]]:
|
||
"""批量转换坐标点。
|
||
|
||
Parameters
|
||
----------
|
||
xs:
|
||
X 坐标序列(地理 CRS 时为经度)。
|
||
ys:
|
||
Y 坐标序列(地理 CRS 时为纬度)。
|
||
source_crs:
|
||
源 CRS。
|
||
target_crs:
|
||
目标 CRS。
|
||
always_xy:
|
||
强制以 (X, Y) 顺序输入输出(推荐保持 True)。
|
||
|
||
Returns
|
||
-------
|
||
(list[float], list[float])
|
||
转换后的 (xs, ys)。
|
||
"""
|
||
transformer = Transformer.from_crs(source_crs, target_crs, always_xy=always_xy)
|
||
result_xs, result_ys = transformer.transform(list(xs), list(ys))
|
||
return list(result_xs), list(result_ys)
|
||
|
||
|
||
def transform_point(
|
||
x: float,
|
||
y: float,
|
||
source_crs: str | int,
|
||
target_crs: str | int,
|
||
*,
|
||
always_xy: bool = True,
|
||
) -> tuple[float, float]:
|
||
"""转换单个坐标点。"""
|
||
xs, ys = transform_coordinates([x], [y], source_crs, target_crs, always_xy=always_xy)
|
||
return xs[0], ys[0]
|
||
|
||
|
||
def suggest_projected_crs(lon: float, lat: float, use_3degree: bool = True) -> str:
|
||
"""根据经纬度范围自动推荐适合面积/距离计算的投影 CRS(CGCS2000 高斯-克吕格 带号)。
|
||
|
||
Parameters
|
||
----------
|
||
lon:
|
||
中心经度(CGCS2000)。
|
||
lat:
|
||
中心纬度(CGCS2000)。
|
||
use_3degree:
|
||
True 表示3度分带,False 表示6度分带。
|
||
|
||
Returns
|
||
-------
|
||
str
|
||
EPSG 代码字符串,如 ``"EPSG:32650"``(CGCS2000 高斯-克吕格 带号)。
|
||
"""
|
||
if use_3degree:
|
||
# 3度分带计算:中央经线 = 3° * n
|
||
central_meridian = round(lon / 3) * 3
|
||
zone_number = int(central_meridian / 3)
|
||
|
||
# CGCS2000 3度带投影定义
|
||
# 从第25带到45带(75°E-135°E)
|
||
if 75 <= central_meridian <= 135:
|
||
epsg = 4513 + zone_number - 25
|
||
else:
|
||
# 默认使用36带(108°E)
|
||
epsg = 4524
|
||
logger.warning("经度范围超出3度带范围,默认使用36带(108°E)")
|
||
else:
|
||
# 6度分带计算:中央经线 = 6° * n - 3°
|
||
central_meridian = round((lon + 3) / 6) * 6 - 3
|
||
zone_number = int((central_meridian + 3) / 6)
|
||
|
||
# CGCS2000 6度带投影定义
|
||
# 从第13带到23带(75°E-135°E)
|
||
if 75 <= central_meridian <= 135:
|
||
epsg = 4491 + zone_number - 13
|
||
else:
|
||
# 默认使用18带(105°E)
|
||
epsg = 4496
|
||
logger.warning("经度范围超出6度带范围,默认使用18带(105°E)")
|
||
|
||
logger.debug("建议投影 CRS:EPSG:%d(lon=%.2f, lat=%.2f)", epsg, lon, lat)
|
||
return f"EPSG:{epsg}"
|
||
|
||
|
||
def reproject_gdf(
|
||
gdf: gpd.GeoDataFrame,
|
||
target_crs: str | int | None = None,
|
||
*,
|
||
auto_crs: bool = False,
|
||
verbose: bool = True,
|
||
) -> gpd.GeoDataFrame:
|
||
"""将 GeoDataFrame(要素类)重投影到目标坐标系。
|
||
|
||
Parameters
|
||
----------
|
||
gdf:
|
||
输入 GeoDataFrame,必须已定义 CRS。
|
||
target_crs:
|
||
目标 CRS,如 ``"EPSG:4326"``、``"EPSG:4490"`` 或整数 ``4523``。
|
||
与 ``auto_crs=True`` 二选一。
|
||
auto_crs:
|
||
为 ``True`` 时忽略 ``target_crs``,根据数据中心点自动推荐
|
||
CGCS2000 高斯-克吕格 带号(适合面积/距离计算场景)。
|
||
verbose:
|
||
为 ``True`` 时在日志中打印投影前后的 CRS 信息。
|
||
|
||
Returns
|
||
-------
|
||
gpd.GeoDataFrame
|
||
重投影后的新 GeoDataFrame(原始对象不变)。
|
||
|
||
Raises
|
||
------
|
||
ValueError
|
||
``gdf`` 未定义 CRS,或 ``target_crs`` 与 ``auto_crs`` 均未指定。
|
||
|
||
Examples
|
||
--------
|
||
>>> # 指定目标 CRS
|
||
>>> gdf_proj = reproject_gdf(gdf, "EPSG:4490")
|
||
|
||
>>> # 自动推荐 CGCS2000 高斯-克吕格 带号(用于面积计算)
|
||
>>> gdf_utm = reproject_gdf(gdf, auto_crs=True)
|
||
|
||
>>> # 配合 GDB 读取
|
||
>>> gdf = read_gdb("data.gdb", layer="图斑")
|
||
>>> gdf_proj = reproject_gdf(gdf, "EPSG:4326")
|
||
"""
|
||
|
||
|
||
if gdf.crs is None:
|
||
raise ValueError("GeoDataFrame 未定义 CRS,请先调用 set_crs() 设置坐标系。")
|
||
|
||
if auto_crs:
|
||
# 先统一到地理坐标系,再取中心点推荐 CGCS2000 高斯-克吕格 带号
|
||
if gdf.crs.is_projected:
|
||
center = gdf.to_crs("EPSG:4490").geometry.unary_union.centroid
|
||
else:
|
||
center = gdf.geometry.unary_union.centroid
|
||
target_crs = suggest_projected_crs(center.x, center.y)
|
||
logger.info("auto_crs:自动推荐投影 CRS = %s", target_crs)
|
||
|
||
if target_crs is None:
|
||
raise ValueError("请指定 target_crs,或设置 auto_crs=True 自动推荐投影。")
|
||
|
||
src_crs_str = gdf.crs.to_string()
|
||
result = gdf.to_crs(target_crs)
|
||
|
||
if verbose:
|
||
tgt_info = get_crs_info(target_crs)
|
||
logger.info(
|
||
"要素类重投影完成:%s → %s(%s,单位:%s,要素数:%d)",
|
||
src_crs_str,
|
||
tgt_info.get("epsg") or target_crs,
|
||
tgt_info.get("name"),
|
||
tgt_info.get("unit"),
|
||
len(result),
|
||
)
|
||
|
||
return result
|
||
|