import pandas as pd import numpy as np import os import geopandas as gpd from openpyxl import Workbook from openpyxl.styles import Alignment, Font, Border, Side from openpyxl.utils import get_column_letter # 定义指标代码与单位的对应关系 INDICATOR_UNITS = { # 基本指标 'PH': ('pH', '-'), 'ECA': ('交换性钙', 'cmol(½Ca²⁺)/kg'), 'EMG': ('交换性镁', 'cmol(½Mg²⁺)/kg'), 'TN': ('全氮', 'g/kg'), 'TP': ('全磷', 'g/kg'), 'TK': ('全钾', 'g/kg'), 'AS1': ('有效硫', 'mg/kg'), 'AB': ('有效硼', 'mg/kg'), 'AP': ('有效磷', 'mg/kg'), 'AFE': ('有效铁', 'mg/kg'), 'ACU': ('有效铜', 'mg/kg'), 'AZN': ('有效锌', 'mg/kg'), 'AMN': ('有效锰', 'mg/kg'), 'OM': ('有机质', 'g/kg'), 'GZCHD': ('耕层厚度', 'cm'), 'AK': ('速效钾', 'mg/kg'), 'CEC': ('阳离子交换量', 'cmol/kg'), # 特殊指标 - 根据文件名对应字段 'FL': ('粉粒', '%'), 'NL': ('黏粒', '%'), 'SL': ('砂粒', '%'), 'TRRZPJZ': ('土壤容重', 'g/cm³'), 'TRZD': ('土壤质地', '分类'), # 其他可能指标 'AMO': ('有效钼', 'mg/kg'), 'TSE': ('全硒', 'mg/kg'), 'YXTCHD': ('有效土层厚度', 'cm') } # 文件名到字段的映射 FILENAME_TO_FIELD = { '粉粒': 'FL', '黏粒': 'NL', '砂粒': 'SL', '表层容重': 'TRRZPJZ', '土壤质地十二级分类': 'TRZD', '双江县YXTCHD': 'YXTCHD' } # 扩展字段别名映射,支持更多pH字段名 FIELD_ALIASES = { 'PH': ['pH', 'PH', 'ph'], # 支持pH的各种大小写形式 'ECA': ['交换性钙', 'ECA'], 'EMG': ['交换性镁', 'EMG'], 'TN': ['全氮', 'TN'], 'TP': ['全磷', 'TP'], 'TK': ['全钾', 'TK'], 'AS1': ['有效硫', 'AS1'], 'AB': ['有效硼', 'AB'], 'AP': ['有效磷', 'AP'], 'AFE': ['有效铁', 'AFE'], 'ACU': ['有效铜', 'ACU'], 'AZN': ['有效锌', 'AZN'], 'AMN': ['有效锰', 'AMN'], 'OM': ['有机质', 'OM'], 'GZCHD': ['耕层厚度', 'GZCHD'], 'AK': ['速效钾', 'AK'], 'CEC': ['阳离子交换量', 'CEC'], 'FL': ['粉粒', 'FL'], 'NL': ['黏粒', 'NL'], 'SL': ['砂粒', 'SL'], 'TRRZPJZ': ['土壤容重', 'TRRZPJZ'], 'TRZD': ['土壤质地', 'TRZD'], 'AMO': ['有效钼', 'AMO'], 'TSE': ['全硒', 'TSE'], 'YXTCHD': ['有效土层厚度', 'YXTCHD'] } def find_shapefiles(folder_path): """在文件夹中递归查找所有的Shapefile文件""" shapefiles = [] for root, dirs, files in os.walk(folder_path): for file in files: if file.lower().endswith('.shp'): shapefiles.append(os.path.join(root, file)) return shapefiles def read_shapefile_data(shapefile_path): """读取Shapefile数据并返回属性表""" try: print(f" 读取Shapefile: {os.path.basename(shapefile_path)}") gdf = gpd.read_file(shapefile_path, encoding='utf-8') print(f" 要素数量: {len(gdf)}") print(f" 属性字段: {list(gdf.columns)}") return gdf except Exception as e: print(f" 读取Shapefile失败: {e}") try: gdf = gpd.read_file(shapefile_path, encoding='gbk') print(f" 使用GBK编码成功读取") return gdf except: return None def get_indicator_data(gdf, filename): """从GeoDataFrame中获取指标数据,使用统一字段匹配逻辑""" indicator_data = {} basename = os.path.basename(filename).replace('.shp', '') # 1. 首先尝试文件名映射 target_field = None if basename in FILENAME_TO_FIELD: target_field = FILENAME_TO_FIELD[basename] if target_field in gdf.columns: indicator_data[target_field] = gdf[target_field] print(f" 通过文件名映射找到字段: {target_field}") else: # 尝试通过别名查找 for indicator_code in INDICATOR_UNITS.keys(): if target_field == indicator_code: for alias in FIELD_ALIASES.get(indicator_code, []): if alias in gdf.columns: indicator_data[indicator_code] = gdf[alias] print(f" 通过文件名映射+别名找到字段: {alias} -> {indicator_code}") break # 2. 如果没有通过文件名找到,尝试直接匹配所有指标和别名 if not indicator_data: for indicator_code in INDICATOR_UNITS.keys(): # 先尝试直接匹配指标代码 if indicator_code in gdf.columns: indicator_data[indicator_code] = gdf[indicator_code] print(f" 直接匹配字段: {indicator_code}") continue # 再尝试匹配别名 aliases = FIELD_ALIASES.get(indicator_code, []) for alias in aliases: if alias in gdf.columns: indicator_data[indicator_code] = gdf[alias] print(f" 通过别名匹配: {alias} -> {indicator_code}") break # 3. 额外检查:如果文件名包含特定关键词,尝试匹配 if not indicator_data: filename_lower = basename.lower() for indicator_code, (chinese_name, unit) in INDICATOR_UNITS.items(): if indicator_code.lower() in filename_lower or chinese_name in filename_lower: # 尝试匹配指标代码或中文名 if indicator_code in gdf.columns: indicator_data[indicator_code] = gdf[indicator_code] print(f" 通过文件名关键词匹配: {indicator_code}") break elif chinese_name in gdf.columns: indicator_data[indicator_code] = gdf[chinese_name] print(f" 通过文件名关键词匹配中文名: {chinese_name} -> {indicator_code}") break return indicator_data def get_combined_stats_from_folder(folder_path, folder_name="数据"): """从文件夹中所有shapefile合并统计指定指标""" shapefiles = find_shapefiles(folder_path) if not shapefiles: print(f" 未找到Shapefile文件") return pd.DataFrame() print(f" 找到 {len(shapefiles)} 个Shapefile文件") all_data = {code: [] for code in INDICATOR_UNITS.keys()} for i, shp_file in enumerate(shapefiles, 1): print(f"\n [{i}] 处理文件: {os.path.basename(shp_file)}") gdf = read_shapefile_data(shp_file) if gdf is not None: indicator_data = get_indicator_data(gdf, shp_file) for indicator_code, data_series in indicator_data.items(): if indicator_code in all_data: # 转换为数值类型,处理可能的非数值数据 try: data_series = pd.to_numeric(data_series, errors='coerce') valid_data = data_series.dropna() if len(valid_data) > 0: all_data[indicator_code].extend(valid_data.tolist()) print(f" 提取 {indicator_code}: {len(valid_data)} 个值") except Exception as e: print(f" 处理 {indicator_code} 数据时出错: {e}") # 计算每个指标的合并统计 stats_list = [] for indicator_code, (chinese_name, unit) in INDICATOR_UNITS.items(): data_list = all_data.get(indicator_code, []) if not data_list: continue data_series = pd.Series(data_list) # 过滤极端值(可选,根据实际需求调整) data_series = data_series[(data_series >= 0) | pd.isna(data_series)] if len(data_series) == 0: continue # 关键修复1:计算总体标准差(ddof=0),而不是默认的样本标准差(ddof=1) std_dev = data_series.std(ddof=0) mean_val = data_series.mean() # 关键修复2:优化变异系数计算 if abs(mean_val) < 1e-8: # 均值接近0时 cv_value = 0.0 else: # CV = (标准差 / 均值) * 100,保留2位小数 cv_value = round((std_dev / mean_val) * 100, 2) stats = { '指标代码': indicator_code, '指标': chinese_name, '单位': unit, '样点数': int(len(data_series)), 'Min': round(float(data_series.min()), 2), 'Max': round(float(data_series.max()), 2), 'Mean': round(float(mean_val), 2), 'Std': round(float(std_dev), 2), # 使用总体标准差 'CV': cv_value } stats_list.append(stats) print(f" 统计 {chinese_name}({indicator_code}): {len(data_series)} 个样点") if stats_list: stats_df = pd.DataFrame(stats_list) stats_df = stats_df.sort_values('指标') print(f"\n 总共统计到 {len(stats_df)} 个指标") return stats_df print(" 未找到任何指标数据") return pd.DataFrame() def create_statistics_excel(before_folder, after_folder, output_path): """创建融合的统计表格,在剔除后表格前加一列剔除前样点数和剔除样点数""" workbook = Workbook() # 移除默认sheet if 'Sheet' in workbook.sheetnames: default_sheet = workbook['Sheet'] workbook.remove(default_sheet) # 定义样式 thin_border = Border( left=Side(style='thin'), right=Side(style='thin'), top=Side(style='thin'), bottom=Side(style='thin') ) print("=" * 60) print("开始分析样点数据") print("=" * 60) # 分析剔除前数据 before_stats = None if os.path.exists(before_folder): print(f"\n[1] 分析剔除前数据:") print(f"文件夹路径: {before_folder}") before_stats = get_combined_stats_from_folder(before_folder, "剔除前") if not before_stats.empty: print(f"✓ 剔除前统计完成: {len(before_stats)} 个指标") else: print("✗ 剔除前未找到指定指标数据") before_stats = None else: print(f"✗ 剔除前文件夹不存在: {before_folder}") before_stats = None # 分析剔除后数据 if os.path.exists(after_folder): print(f"\n[2] 分析剔除后数据:") print(f"文件夹路径: {after_folder}") after_stats = get_combined_stats_from_folder(after_folder, "剔除后") if not after_stats.empty: # 创建融合的统计工作表 sheet_combined = workbook.create_sheet(title="样点统计") # 新的表头:指标, 单位, 剔除前样点数, 剔除样点数, 剔除后样点数, Min, Max, Mean, Std, CV combined_headers = ['指标', '单位', '剔除前样点数', '剔除样点数', '剔除后样点数', 'Min', 'Max', 'Mean', 'Std', 'CV'] for col, header in enumerate(combined_headers, 1): cell = sheet_combined.cell(row=1, column=col, value=header) cell.alignment = Alignment(horizontal='center', vertical='center') cell.font = Font(bold=True) cell.border = thin_border # 写入数据 for row_idx, (index, after_row) in enumerate(after_stats.iterrows(), start=2): # 查找对应的剔除前数据 before_sample_count = 0 before_row = None if before_stats is not None: # 首先尝试通过指标代码匹配 matching_rows = before_stats[before_stats['指标代码'] == after_row['指标代码']] if not matching_rows.empty: before_row = matching_rows.iloc[0] else: # 如果指标代码匹配失败,尝试通过指标名称匹配 matching_rows = before_stats[before_stats['指标'] == after_row['指标']] if not matching_rows.empty: before_row = matching_rows.iloc[0] if before_row is not None: before_sample_count = int(before_row['样点数']) # 计算剔除样点数 after_sample_count = int(after_row['样点数']) abnormal_count = max(0, before_sample_count - after_sample_count) # 写入数据 sheet_combined.cell(row=row_idx, column=1, value=after_row['指标']) # 指标 sheet_combined.cell(row=row_idx, column=2, value=after_row['单位']) # 单位 sheet_combined.cell(row=row_idx, column=3, value=before_sample_count) # 剔除前样点数 sheet_combined.cell(row=row_idx, column=4, value=abnormal_count) # 剔除样点数 sheet_combined.cell(row=row_idx, column=5, value=after_sample_count) # 剔除后样点数 sheet_combined.cell(row=row_idx, column=6, value=after_row['Min']) # Min sheet_combined.cell(row=row_idx, column=7, value=after_row['Max']) # Max sheet_combined.cell(row=row_idx, column=8, value=after_row['Mean']) # Mean sheet_combined.cell(row=row_idx, column=9, value=after_row['Std']) # Std sheet_combined.cell(row=row_idx, column=10, value=after_row['CV']) # CV # 设置所有单元格的样式 for col_idx in range(1, 11): cell = sheet_combined.cell(row=row_idx, column=col_idx) cell.alignment = Alignment(horizontal='center', vertical='center') cell.border = thin_border # 如果剔除了样点,高亮显示剔除样点数列 if abnormal_count > 0: cell = sheet_combined.cell(row=row_idx, column=4) # 剔除样点数列 cell.font = Font(bold=True, color="FF0000") # 红色加粗 # 调整列宽 combined_column_widths = { '指标': 15, '单位': 12, '剔除前样点数': 12, '剔除样点数': 12, '剔除后样点数': 12, 'Min': 10, 'Max': 10, 'Mean': 10, 'Std': 10, 'CV': 10 } for col_idx, col_name in enumerate(combined_headers, 1): column_letter = get_column_letter(col_idx) if col_name in combined_column_widths: sheet_combined.column_dimensions[column_letter].width = combined_column_widths[col_name] print(f"\n✓ 融合统计完成: {len(after_stats)} 个指标") # 输出匹配信息 if before_stats is not None: print(f" 剔除前找到 {len(before_stats)} 个指标") print(f" 剔除后找到 {len(after_stats)} 个指标") print( f" 成功匹配 {len([i for i in range(2, len(after_stats) + 2) if sheet_combined.cell(row=i, column=3).value > 0])} 个指标的剔除前数据") else: print("✗ 剔除后未找到指定指标数据") sheet_combined = workbook.create_sheet(title="样点统计") sheet_combined.cell(row=1, column=1, value="未找到指定指标数据") else: print(f"✗ 剔除后文件夹不存在: {after_folder}") sheet_combined = workbook.create_sheet(title="样点统计") sheet_combined.cell(row=1, column=1, value="剔除后文件夹不存在") # 保存文件 workbook.save(output_path) print(f"\n" + "=" * 60) print(f"文件保存成功: {output_path}") print("=" * 60) # ================ 使用示例 ================ if __name__ == "__main__": # 方式1: 处理单个样点数据对 before_folder = r"D:\a陆平\1.5实验数据\12月新版实验室数据\云南省实验室数据成果1218\永仁县20260127" after_folder = r"D:\a陆平\1.5实验数据\12月新版实验室数据\云南省实验室数据成果1218\永仁县20260127剔除后" output_path = r"D:\a陆平\1.5实验数据\12月新版实验室数据\云南省实验室数据成果1218\永仁县样点统计结果.xlsx" # 执行 create_statistics_excel(before_folder, after_folder, output_path)