Files
geo_tools/scripts/其他工具/A耕作层厚度栅格制作_新1.py
missum db51d41aef refactor: 重构项目结构,将geo_tools重命名为app并更新相关引用
- 将主包名从geo_tools改为app
- 更新所有模块中的引用路径
- 迁移并更新测试用例
- 添加项目规则文档
- 保持原有功能不变,仅进行结构调整
2026-04-12 19:49:56 +08:00

188 lines
7.9 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
from pathlib import Path
import time
import geopandas as gpd
from geopandas.io import file
import pandas as pd
import numpy as np
def assign_gzchd_flexible_v2(soil_prop, point_path, polygon_path, output_path):
print("正在读取数据...")
points = gpd.read_file(point_path)
polygons = gpd.read_file(polygon_path)
# 1. 坐标系转换
if points.crs != polygons.crs:
print(f"坐标系不一致,正在转换点数据...")
points = points.to_crs(polygons.crs)
# 2. 预处理, 判断样点是否存在TZ字段如果不存在则用TDLYLX字段代替,并将其转为字符串类型,如果两个字段都不存在,则报错
if 'TZ' not in points.columns:
if 'TDLYLX' in points.columns:
points['TZ'] = points['TDLYLX'].astype(str).str.strip()
else:
raise ValueError("点要素类中不存在TZ或TDLYLX字段无法进行匹配")
else:
points['TZ'] = points['TZ'].astype(str).str.strip()
polygons['TZ'] = polygons['TZ'].astype(str).str.strip()
# 确保 GZCHD 是数值类型,避免合并时类型冲突
points[soil_prop] = pd.to_numeric(points[soil_prop], errors='coerce').fillna(0)
if soil_prop in polygons.columns:
polygons = polygons.drop(columns=[soil_prop])
# 辅助函数:按指定字段分组进行最近点匹配
def match_by_attribute(poly_gdf, pt_gdf, attr_name, suffix):
if attr_name not in poly_gdf.columns or attr_name not in pt_gdf.columns:
return None, []
poly_sub = poly_gdf[poly_gdf[attr_name].notna()].copy()
point_sub = pt_gdf[pt_gdf[attr_name].notna()].copy()
if poly_sub.empty or point_sub.empty:
return None, []
poly_sub[attr_name] = poly_sub[attr_name].astype(str).str.strip()
point_sub[attr_name] = point_sub[attr_name].astype(str).str.strip()
common_values = set(poly_sub[attr_name].unique()) & set(point_sub[attr_name].unique())
if not common_values:
return None, []
matched_parts = []
matched_ids = []
for value in common_values:
poly_part = poly_sub[poly_sub[attr_name] == value].copy()
point_part = point_sub[point_sub[attr_name] == value][[soil_prop, 'geometry']].copy()
if poly_part.empty or point_part.empty:
continue
matched_part = gpd.sjoin_nearest(poly_part, point_part, how='left', rsuffix=suffix)
matched_part = matched_part[~matched_part.index.duplicated(keep='first')]
if not matched_part.empty:
matched_parts.append(matched_part)
matched_ids.extend(matched_part.index.tolist())
if matched_parts:
return pd.concat(matched_parts), matched_ids
return None, []
matched_results = []
matched_indices = []
# --- 第一步:按相同 TZ 匹配 ---
print("步骤 1: 正在匹配相同 TZ 的最近点...")
first_matched, first_ids = match_by_attribute(polygons, points, 'TZ', '_p1')
if first_matched is not None and not first_matched.empty:
matched_results.append(first_matched)
matched_indices.extend(first_ids)
# --- 第二步:按 TS 匹配未匹配面 ---
unmatched_mask = ~polygons.index.isin(matched_indices)
remaining_polygons = polygons[unmatched_mask].copy()
print(f"步骤 2: 正在为 {len(remaining_polygons)} 个要素匹配 TS 最近点...")
if not remaining_polygons.empty:
ts_matched, ts_ids = match_by_attribute(remaining_polygons, points, 'TS', '_p_ts')
if ts_matched is not None and not ts_matched.empty:
matched_results.append(ts_matched)
matched_indices.extend(ts_ids)
remaining_polygons = polygons[~polygons.index.isin(matched_indices)].copy()
print(f"已匹配 TS: {len(ts_ids)} 个要素,剩余 {len(remaining_polygons)} 个。")
else:
print("未匹配到 TS 类型要素,继续下一步。")
# --- 第三步:按 YL 匹配未匹配面 ---
if not remaining_polygons.empty:
print(f"步骤 3: 正在为 {len(remaining_polygons)} 个要素匹配 YL 最近点...")
yl_matched, yl_ids = match_by_attribute(remaining_polygons, points, 'YL', '_p_yl')
if yl_matched is not None and not yl_matched.empty:
matched_results.append(yl_matched)
matched_indices.extend(yl_ids)
remaining_polygons = polygons[~polygons.index.isin(matched_indices)].copy()
print(f"已匹配 YL: {len(yl_ids)} 个要素,剩余 {len(remaining_polygons)} 个。")
else:
print("未匹配到 YL 类型要素,继续下一步。")
# --- 第四步:按 TL 匹配未匹配面 ---
if not remaining_polygons.empty:
print(f"步骤 4: 正在为 {len(remaining_polygons)} 个要素匹配 TL 最近点...")
tl_matched, tl_ids = match_by_attribute(remaining_polygons, points, 'TL', '_p_tl')
if tl_matched is not None and not tl_matched.empty:
matched_results.append(tl_matched)
matched_indices.extend(tl_ids)
remaining_polygons = polygons[~polygons.index.isin(matched_indices)].copy()
print(f"已匹配 TL: {len(tl_ids)} 个要素,剩余 {len(remaining_polygons)} 个。")
else:
print("未匹配到 TL 类型要素,继续全局最近点。")
else:
print("没有未匹配的面要素,跳过 TS/YL/TL 匹配。")
# --- 最后:全局最近点匹配剩余面 ---
unmatched_mask = ~polygons.index.isin(matched_indices)
remaining_polygons = polygons[unmatched_mask].copy()
print(f"最后一步: 正在为 {len(remaining_polygons)} 个要素匹配全局最近点...")
if not remaining_polygons.empty:
point_pool = points[[soil_prop, 'geometry']]
step_final = gpd.sjoin_nearest(remaining_polygons, point_pool, how='left', rsuffix='_p2')
step_final = step_final[~step_final.index.duplicated(keep='first')]
matched_results.append(step_final)
# --- 第三步:稳健合并 ---
print("正在合并数据...")
# 过滤掉列表中可能的 None 或空 DataFrame防止 FutureWarning
to_concat = [res for res in matched_results if res is not None and not res.empty]
if to_concat:
final_gdf = pd.concat(to_concat)
else:
# 如果没有任何匹配结果,返回带空 GZCHD 的原面要素
final_gdf = polygons.copy()
final_gdf[soil_prop] = 0
# --- 4. 清理与保存 ---
# 删除临时列
cols_to_drop = [
c for c in final_gdf.columns
if 'index_' in c
or '_p1' in c
or '_p2' in c
or '_p_ts' in c
or '_p_yl' in c
or '_p_tl' in c
]
final_gdf = final_gdf.drop(columns=cols_to_drop)
# 强制去重复列名
final_gdf = final_gdf.loc[:, ~final_gdf.columns.duplicated()]
# 填充空值并确保类型一致
if soil_prop in final_gdf.columns:
final_gdf[soil_prop] = final_gdf[soil_prop].fillna(0)
else:
final_gdf[soil_prop] = 0
print(f"正在保存结果至: {output_path}")
final_gdf.to_file(output_path, encoding='utf-8')
print("处理完成!")
return final_gdf
if __name__ == "__main__":
# 遍历文件夹中所有样点shp文件并进行处理
shp_file = r"E:\@三普属性图出图\测试\YXTCHD.shp" # 样点数据文件夹
dltb_file = r"E:\@三普属性图出图\广西天峨县\@基础数据\土壤类型图\土壤类型图.shp" # 耕地图斑
output_folder = r"E:\@三普属性图出图\广西天峨县" # 输出文件夹
assign_gzchd_flexible_v2(
soil_prop= "YXTCHD", # 耕地层厚度字段名
point_path=shp_file, # 样点数据
polygon_path= dltb_file,
output_path=fr'{output_folder}\YXTCHD.shp' # 输出文件
)
time.sleep(2) # 防止文件读写冲突