import os from pathlib import Path import time import geopandas as gpd from geopandas.io import file import pandas as pd import numpy as np def assign_gzchd_flexible_v2(soil_prop, point_path, polygon_path, output_path): print("正在读取数据...") points = gpd.read_file(point_path) polygons = gpd.read_file(polygon_path) # 1. 坐标系转换 if points.crs != polygons.crs: print(f"坐标系不一致,正在转换点数据...") points = points.to_crs(polygons.crs) # 2. 预处理, 判断样点是否存在TZ字段,如果不存在,则用TDLYLX字段代替,并将其转为字符串类型,如果两个字段都不存在,则报错 if 'TZ' not in points.columns: if 'TDLYLX' in points.columns: points['TZ'] = points['TDLYLX'].astype(str).str.strip() else: raise ValueError("点要素类中不存在TZ或TDLYLX字段,无法进行匹配!") else: points['TZ'] = points['TZ'].astype(str).str.strip() polygons['TZ'] = polygons['TZ'].astype(str).str.strip() # 确保 GZCHD 是数值类型,避免合并时类型冲突 points[soil_prop] = pd.to_numeric(points[soil_prop], errors='coerce').fillna(0) if soil_prop in polygons.columns: polygons = polygons.drop(columns=[soil_prop]) # 辅助函数:按指定字段分组进行最近点匹配 def match_by_attribute(poly_gdf, pt_gdf, attr_name, suffix): if attr_name not in poly_gdf.columns or attr_name not in pt_gdf.columns: return None, [] poly_sub = poly_gdf[poly_gdf[attr_name].notna()].copy() point_sub = pt_gdf[pt_gdf[attr_name].notna()].copy() if poly_sub.empty or point_sub.empty: return None, [] poly_sub[attr_name] = poly_sub[attr_name].astype(str).str.strip() point_sub[attr_name] = point_sub[attr_name].astype(str).str.strip() common_values = set(poly_sub[attr_name].unique()) & set(point_sub[attr_name].unique()) if not common_values: return None, [] matched_parts = [] matched_ids = [] for value in common_values: poly_part = poly_sub[poly_sub[attr_name] == value].copy() point_part = point_sub[point_sub[attr_name] == value][[soil_prop, 'geometry']].copy() if poly_part.empty or point_part.empty: continue matched_part = gpd.sjoin_nearest(poly_part, point_part, how='left', rsuffix=suffix) matched_part = matched_part[~matched_part.index.duplicated(keep='first')] if not matched_part.empty: matched_parts.append(matched_part) matched_ids.extend(matched_part.index.tolist()) if matched_parts: return pd.concat(matched_parts), matched_ids return None, [] matched_results = [] matched_indices = [] # --- 第一步:按相同 TZ 匹配 --- print("步骤 1: 正在匹配相同 TZ 的最近点...") first_matched, first_ids = match_by_attribute(polygons, points, 'TZ', '_p1') if first_matched is not None and not first_matched.empty: matched_results.append(first_matched) matched_indices.extend(first_ids) # --- 第二步:按 TS 匹配未匹配面 --- unmatched_mask = ~polygons.index.isin(matched_indices) remaining_polygons = polygons[unmatched_mask].copy() print(f"步骤 2: 正在为 {len(remaining_polygons)} 个要素匹配 TS 最近点...") if not remaining_polygons.empty: ts_matched, ts_ids = match_by_attribute(remaining_polygons, points, 'TS', '_p_ts') if ts_matched is not None and not ts_matched.empty: matched_results.append(ts_matched) matched_indices.extend(ts_ids) remaining_polygons = polygons[~polygons.index.isin(matched_indices)].copy() print(f"已匹配 TS: {len(ts_ids)} 个要素,剩余 {len(remaining_polygons)} 个。") else: print("未匹配到 TS 类型要素,继续下一步。") # --- 第三步:按 YL 匹配未匹配面 --- if not remaining_polygons.empty: print(f"步骤 3: 正在为 {len(remaining_polygons)} 个要素匹配 YL 最近点...") yl_matched, yl_ids = match_by_attribute(remaining_polygons, points, 'YL', '_p_yl') if yl_matched is not None and not yl_matched.empty: matched_results.append(yl_matched) matched_indices.extend(yl_ids) remaining_polygons = polygons[~polygons.index.isin(matched_indices)].copy() print(f"已匹配 YL: {len(yl_ids)} 个要素,剩余 {len(remaining_polygons)} 个。") else: print("未匹配到 YL 类型要素,继续下一步。") # --- 第四步:按 TL 匹配未匹配面 --- if not remaining_polygons.empty: print(f"步骤 4: 正在为 {len(remaining_polygons)} 个要素匹配 TL 最近点...") tl_matched, tl_ids = match_by_attribute(remaining_polygons, points, 'TL', '_p_tl') if tl_matched is not None and not tl_matched.empty: matched_results.append(tl_matched) matched_indices.extend(tl_ids) remaining_polygons = polygons[~polygons.index.isin(matched_indices)].copy() print(f"已匹配 TL: {len(tl_ids)} 个要素,剩余 {len(remaining_polygons)} 个。") else: print("未匹配到 TL 类型要素,继续全局最近点。") else: print("没有未匹配的面要素,跳过 TS/YL/TL 匹配。") # --- 最后:全局最近点匹配剩余面 --- unmatched_mask = ~polygons.index.isin(matched_indices) remaining_polygons = polygons[unmatched_mask].copy() print(f"最后一步: 正在为 {len(remaining_polygons)} 个要素匹配全局最近点...") if not remaining_polygons.empty: point_pool = points[[soil_prop, 'geometry']] step_final = gpd.sjoin_nearest(remaining_polygons, point_pool, how='left', rsuffix='_p2') step_final = step_final[~step_final.index.duplicated(keep='first')] matched_results.append(step_final) # --- 第三步:稳健合并 --- print("正在合并数据...") # 过滤掉列表中可能的 None 或空 DataFrame,防止 FutureWarning to_concat = [res for res in matched_results if res is not None and not res.empty] if to_concat: final_gdf = pd.concat(to_concat) else: # 如果没有任何匹配结果,返回带空 GZCHD 的原面要素 final_gdf = polygons.copy() final_gdf[soil_prop] = 0 # --- 4. 清理与保存 --- # 删除临时列 cols_to_drop = [ c for c in final_gdf.columns if 'index_' in c or '_p1' in c or '_p2' in c or '_p_ts' in c or '_p_yl' in c or '_p_tl' in c ] final_gdf = final_gdf.drop(columns=cols_to_drop) # 强制去重复列名 final_gdf = final_gdf.loc[:, ~final_gdf.columns.duplicated()] # 填充空值并确保类型一致 if soil_prop in final_gdf.columns: final_gdf[soil_prop] = final_gdf[soil_prop].fillna(0) else: final_gdf[soil_prop] = 0 print(f"正在保存结果至: {output_path}") final_gdf.to_file(output_path, encoding='utf-8') print("处理完成!") return final_gdf if __name__ == "__main__": # 遍历文件夹中所有样点shp文件,并进行处理 shp_file = r"E:\@三普属性图出图\测试\YXTCHD.shp" # 样点数据文件夹 dltb_file = r"E:\@三普属性图出图\广西天峨县\@基础数据\土壤类型图\土壤类型图.shp" # 耕地图斑 output_folder = r"E:\@三普属性图出图\广西天峨县" # 输出文件夹 assign_gzchd_flexible_v2( soil_prop= "YXTCHD", # 耕地层厚度字段名 point_path=shp_file, # 样点数据 polygon_path= dltb_file, output_path=fr'{output_folder}\YXTCHD.shp' # 输出文件 ) time.sleep(2) # 防止文件读写冲突