Files
geo_tools/scripts/其他工具/样点剔除统计表格.py
missum db51d41aef refactor: 重构项目结构,将geo_tools重命名为app并更新相关引用
- 将主包名从geo_tools改为app
- 更新所有模块中的引用路径
- 迁移并更新测试用例
- 添加项目规则文档
- 保持原有功能不变,仅进行结构调整
2026-04-12 19:49:56 +08:00

404 lines
16 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import pandas as pd
import numpy as np
import os
import geopandas as gpd
from openpyxl import Workbook
from openpyxl.styles import Alignment, Font, Border, Side
from openpyxl.utils import get_column_letter
# 定义指标代码与单位的对应关系
INDICATOR_UNITS = {
# 基本指标
'PH': ('pH', '-'),
'ECA': ('交换性钙', 'cmol(½Ca²⁺)/kg'),
'EMG': ('交换性镁', 'cmol(½Mg²⁺)/kg'),
'TN': ('全氮', 'g/kg'),
'TP': ('全磷', 'g/kg'),
'TK': ('全钾', 'g/kg'),
'AS1': ('有效硫', 'mg/kg'),
'AB': ('有效硼', 'mg/kg'),
'AP': ('有效磷', 'mg/kg'),
'AFE': ('有效铁', 'mg/kg'),
'ACU': ('有效铜', 'mg/kg'),
'AZN': ('有效锌', 'mg/kg'),
'AMN': ('有效锰', 'mg/kg'),
'OM': ('有机质', 'g/kg'),
'GZCHD': ('耕层厚度', 'cm'),
'AK': ('速效钾', 'mg/kg'),
'CEC': ('阳离子交换量', 'cmol/kg'),
# 特殊指标 - 根据文件名对应字段
'FL': ('粉粒', '%'),
'NL': ('黏粒', '%'),
'SL': ('砂粒', '%'),
'TRRZPJZ': ('土壤容重', 'g/cm³'),
'TRZD': ('土壤质地', '分类'),
# 其他可能指标
'AMO': ('有效钼', 'mg/kg'),
'TSE': ('全硒', 'mg/kg'),
'YXTCHD': ('有效土层厚度', 'cm')
}
# 文件名到字段的映射
FILENAME_TO_FIELD = {
'粉粒': 'FL',
'黏粒': 'NL',
'砂粒': 'SL',
'表层容重': 'TRRZPJZ',
'土壤质地十二级分类': 'TRZD',
'双江县YXTCHD': 'YXTCHD'
}
# 扩展字段别名映射支持更多pH字段名
FIELD_ALIASES = {
'PH': ['pH', 'PH', 'ph'], # 支持pH的各种大小写形式
'ECA': ['交换性钙', 'ECA'],
'EMG': ['交换性镁', 'EMG'],
'TN': ['全氮', 'TN'],
'TP': ['全磷', 'TP'],
'TK': ['全钾', 'TK'],
'AS1': ['有效硫', 'AS1'],
'AB': ['有效硼', 'AB'],
'AP': ['有效磷', 'AP'],
'AFE': ['有效铁', 'AFE'],
'ACU': ['有效铜', 'ACU'],
'AZN': ['有效锌', 'AZN'],
'AMN': ['有效锰', 'AMN'],
'OM': ['有机质', 'OM'],
'GZCHD': ['耕层厚度', 'GZCHD'],
'AK': ['速效钾', 'AK'],
'CEC': ['阳离子交换量', 'CEC'],
'FL': ['粉粒', 'FL'],
'NL': ['黏粒', 'NL'],
'SL': ['砂粒', 'SL'],
'TRRZPJZ': ['土壤容重', 'TRRZPJZ'],
'TRZD': ['土壤质地', 'TRZD'],
'AMO': ['有效钼', 'AMO'],
'TSE': ['全硒', 'TSE'],
'YXTCHD': ['有效土层厚度', 'YXTCHD']
}
def find_shapefiles(folder_path):
"""在文件夹中递归查找所有的Shapefile文件"""
shapefiles = []
for root, dirs, files in os.walk(folder_path):
for file in files:
if file.lower().endswith('.shp'):
shapefiles.append(os.path.join(root, file))
return shapefiles
def read_shapefile_data(shapefile_path):
"""读取Shapefile数据并返回属性表"""
try:
print(f" 读取Shapefile: {os.path.basename(shapefile_path)}")
gdf = gpd.read_file(shapefile_path, encoding='utf-8')
print(f" 要素数量: {len(gdf)}")
print(f" 属性字段: {list(gdf.columns)}")
return gdf
except Exception as e:
print(f" 读取Shapefile失败: {e}")
try:
gdf = gpd.read_file(shapefile_path, encoding='gbk')
print(f" 使用GBK编码成功读取")
return gdf
except:
return None
def get_indicator_data(gdf, filename):
"""从GeoDataFrame中获取指标数据使用统一字段匹配逻辑"""
indicator_data = {}
basename = os.path.basename(filename).replace('.shp', '')
# 1. 首先尝试文件名映射
target_field = None
if basename in FILENAME_TO_FIELD:
target_field = FILENAME_TO_FIELD[basename]
if target_field in gdf.columns:
indicator_data[target_field] = gdf[target_field]
print(f" 通过文件名映射找到字段: {target_field}")
else:
# 尝试通过别名查找
for indicator_code in INDICATOR_UNITS.keys():
if target_field == indicator_code:
for alias in FIELD_ALIASES.get(indicator_code, []):
if alias in gdf.columns:
indicator_data[indicator_code] = gdf[alias]
print(f" 通过文件名映射+别名找到字段: {alias} -> {indicator_code}")
break
# 2. 如果没有通过文件名找到,尝试直接匹配所有指标和别名
if not indicator_data:
for indicator_code in INDICATOR_UNITS.keys():
# 先尝试直接匹配指标代码
if indicator_code in gdf.columns:
indicator_data[indicator_code] = gdf[indicator_code]
print(f" 直接匹配字段: {indicator_code}")
continue
# 再尝试匹配别名
aliases = FIELD_ALIASES.get(indicator_code, [])
for alias in aliases:
if alias in gdf.columns:
indicator_data[indicator_code] = gdf[alias]
print(f" 通过别名匹配: {alias} -> {indicator_code}")
break
# 3. 额外检查:如果文件名包含特定关键词,尝试匹配
if not indicator_data:
filename_lower = basename.lower()
for indicator_code, (chinese_name, unit) in INDICATOR_UNITS.items():
if indicator_code.lower() in filename_lower or chinese_name in filename_lower:
# 尝试匹配指标代码或中文名
if indicator_code in gdf.columns:
indicator_data[indicator_code] = gdf[indicator_code]
print(f" 通过文件名关键词匹配: {indicator_code}")
break
elif chinese_name in gdf.columns:
indicator_data[indicator_code] = gdf[chinese_name]
print(f" 通过文件名关键词匹配中文名: {chinese_name} -> {indicator_code}")
break
return indicator_data
def get_combined_stats_from_folder(folder_path, folder_name="数据"):
"""从文件夹中所有shapefile合并统计指定指标"""
shapefiles = find_shapefiles(folder_path)
if not shapefiles:
print(f" 未找到Shapefile文件")
return pd.DataFrame()
print(f" 找到 {len(shapefiles)} 个Shapefile文件")
all_data = {code: [] for code in INDICATOR_UNITS.keys()}
for i, shp_file in enumerate(shapefiles, 1):
print(f"\n [{i}] 处理文件: {os.path.basename(shp_file)}")
gdf = read_shapefile_data(shp_file)
if gdf is not None:
indicator_data = get_indicator_data(gdf, shp_file)
for indicator_code, data_series in indicator_data.items():
if indicator_code in all_data:
# 转换为数值类型,处理可能的非数值数据
try:
data_series = pd.to_numeric(data_series, errors='coerce')
valid_data = data_series.dropna()
if len(valid_data) > 0:
all_data[indicator_code].extend(valid_data.tolist())
print(f" 提取 {indicator_code}: {len(valid_data)} 个值")
except Exception as e:
print(f" 处理 {indicator_code} 数据时出错: {e}")
# 计算每个指标的合并统计
stats_list = []
for indicator_code, (chinese_name, unit) in INDICATOR_UNITS.items():
data_list = all_data.get(indicator_code, [])
if not data_list:
continue
data_series = pd.Series(data_list)
# 过滤极端值(可选,根据实际需求调整)
data_series = data_series[(data_series >= 0) | pd.isna(data_series)]
if len(data_series) == 0:
continue
# 关键修复1计算总体标准差ddof=0而不是默认的样本标准差ddof=1
std_dev = data_series.std(ddof=0)
mean_val = data_series.mean()
# 关键修复2优化变异系数计算
if abs(mean_val) < 1e-8: # 均值接近0时
cv_value = 0.0
else:
# CV = (标准差 / 均值) * 100保留2位小数
cv_value = round((std_dev / mean_val) * 100, 2)
stats = {
'指标代码': indicator_code,
'指标': chinese_name,
'单位': unit,
'样点数': int(len(data_series)),
'Min': round(float(data_series.min()), 2),
'Max': round(float(data_series.max()), 2),
'Mean': round(float(mean_val), 2),
'Std': round(float(std_dev), 2), # 使用总体标准差
'CV': cv_value
}
stats_list.append(stats)
print(f" 统计 {chinese_name}({indicator_code}): {len(data_series)} 个样点")
if stats_list:
stats_df = pd.DataFrame(stats_list)
stats_df = stats_df.sort_values('指标')
print(f"\n 总共统计到 {len(stats_df)} 个指标")
return stats_df
print(" 未找到任何指标数据")
return pd.DataFrame()
def create_statistics_excel(before_folder, after_folder, output_path):
"""创建融合的统计表格,在剔除后表格前加一列剔除前样点数和剔除样点数"""
workbook = Workbook()
# 移除默认sheet
if 'Sheet' in workbook.sheetnames:
default_sheet = workbook['Sheet']
workbook.remove(default_sheet)
# 定义样式
thin_border = Border(
left=Side(style='thin'),
right=Side(style='thin'),
top=Side(style='thin'),
bottom=Side(style='thin')
)
print("=" * 60)
print("开始分析样点数据")
print("=" * 60)
# 分析剔除前数据
before_stats = None
if os.path.exists(before_folder):
print(f"\n[1] 分析剔除前数据:")
print(f"文件夹路径: {before_folder}")
before_stats = get_combined_stats_from_folder(before_folder, "剔除前")
if not before_stats.empty:
print(f"✓ 剔除前统计完成: {len(before_stats)} 个指标")
else:
print("✗ 剔除前未找到指定指标数据")
before_stats = None
else:
print(f"✗ 剔除前文件夹不存在: {before_folder}")
before_stats = None
# 分析剔除后数据
if os.path.exists(after_folder):
print(f"\n[2] 分析剔除后数据:")
print(f"文件夹路径: {after_folder}")
after_stats = get_combined_stats_from_folder(after_folder, "剔除后")
if not after_stats.empty:
# 创建融合的统计工作表
sheet_combined = workbook.create_sheet(title="样点统计")
# 新的表头:指标, 单位, 剔除前样点数, 剔除样点数, 剔除后样点数, Min, Max, Mean, Std, CV
combined_headers = ['指标', '单位', '剔除前样点数', '剔除样点数', '剔除后样点数', 'Min', 'Max', 'Mean',
'Std', 'CV']
for col, header in enumerate(combined_headers, 1):
cell = sheet_combined.cell(row=1, column=col, value=header)
cell.alignment = Alignment(horizontal='center', vertical='center')
cell.font = Font(bold=True)
cell.border = thin_border
# 写入数据
for row_idx, (index, after_row) in enumerate(after_stats.iterrows(), start=2):
# 查找对应的剔除前数据
before_sample_count = 0
before_row = None
if before_stats is not None:
# 首先尝试通过指标代码匹配
matching_rows = before_stats[before_stats['指标代码'] == after_row['指标代码']]
if not matching_rows.empty:
before_row = matching_rows.iloc[0]
else:
# 如果指标代码匹配失败,尝试通过指标名称匹配
matching_rows = before_stats[before_stats['指标'] == after_row['指标']]
if not matching_rows.empty:
before_row = matching_rows.iloc[0]
if before_row is not None:
before_sample_count = int(before_row['样点数'])
# 计算剔除样点数
after_sample_count = int(after_row['样点数'])
abnormal_count = max(0, before_sample_count - after_sample_count)
# 写入数据
sheet_combined.cell(row=row_idx, column=1, value=after_row['指标']) # 指标
sheet_combined.cell(row=row_idx, column=2, value=after_row['单位']) # 单位
sheet_combined.cell(row=row_idx, column=3, value=before_sample_count) # 剔除前样点数
sheet_combined.cell(row=row_idx, column=4, value=abnormal_count) # 剔除样点数
sheet_combined.cell(row=row_idx, column=5, value=after_sample_count) # 剔除后样点数
sheet_combined.cell(row=row_idx, column=6, value=after_row['Min']) # Min
sheet_combined.cell(row=row_idx, column=7, value=after_row['Max']) # Max
sheet_combined.cell(row=row_idx, column=8, value=after_row['Mean']) # Mean
sheet_combined.cell(row=row_idx, column=9, value=after_row['Std']) # Std
sheet_combined.cell(row=row_idx, column=10, value=after_row['CV']) # CV
# 设置所有单元格的样式
for col_idx in range(1, 11):
cell = sheet_combined.cell(row=row_idx, column=col_idx)
cell.alignment = Alignment(horizontal='center', vertical='center')
cell.border = thin_border
# 如果剔除了样点,高亮显示剔除样点数列
if abnormal_count > 0:
cell = sheet_combined.cell(row=row_idx, column=4) # 剔除样点数列
cell.font = Font(bold=True, color="FF0000") # 红色加粗
# 调整列宽
combined_column_widths = {
'指标': 15,
'单位': 12,
'剔除前样点数': 12,
'剔除样点数': 12,
'剔除后样点数': 12,
'Min': 10,
'Max': 10,
'Mean': 10,
'Std': 10,
'CV': 10
}
for col_idx, col_name in enumerate(combined_headers, 1):
column_letter = get_column_letter(col_idx)
if col_name in combined_column_widths:
sheet_combined.column_dimensions[column_letter].width = combined_column_widths[col_name]
print(f"\n✓ 融合统计完成: {len(after_stats)} 个指标")
# 输出匹配信息
if before_stats is not None:
print(f" 剔除前找到 {len(before_stats)} 个指标")
print(f" 剔除后找到 {len(after_stats)} 个指标")
print(
f" 成功匹配 {len([i for i in range(2, len(after_stats) + 2) if sheet_combined.cell(row=i, column=3).value > 0])} 个指标的剔除前数据")
else:
print("✗ 剔除后未找到指定指标数据")
sheet_combined = workbook.create_sheet(title="样点统计")
sheet_combined.cell(row=1, column=1, value="未找到指定指标数据")
else:
print(f"✗ 剔除后文件夹不存在: {after_folder}")
sheet_combined = workbook.create_sheet(title="样点统计")
sheet_combined.cell(row=1, column=1, value="剔除后文件夹不存在")
# 保存文件
workbook.save(output_path)
print(f"\n" + "=" * 60)
print(f"文件保存成功: {output_path}")
print("=" * 60)
# ================ 使用示例 ================
if __name__ == "__main__":
# 方式1: 处理单个样点数据对
before_folder = r"D:\a陆平\1.5实验数据\12月新版实验室数据\云南省实验室数据成果1218\永仁县20260127"
after_folder = r"D:\a陆平\1.5实验数据\12月新版实验室数据\云南省实验室数据成果1218\永仁县20260127剔除后"
output_path = r"D:\a陆平\1.5实验数据\12月新版实验室数据\云南省实验室数据成果1218\永仁县样点统计结果.xlsx"
# 执行
create_statistics_excel(before_folder, after_folder, output_path)