Files
ArcGis_Py/tools/core/utils/平差工具.py
2026-04-22 12:27:49 +08:00

201 lines
8.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# 获取每个一级地类面积主要是12类
import arcpy
import numpy as np
import pandas as pd
from .math_utils import correct_rounding_error
# 获取目标面积
def get_area_by_group(dltb_class_feature, excel_target_path, xzqmc, is_by_xzq=False):
try:
# 读取目标面积Excel文件
target_df = pd.read_excel(excel_target_path)
# 确保列名匹配
target_df.columns = target_df.columns.str.strip()
if is_by_xzq:
# 地类编码映射字典
land_type_mapping = {
'耕地': '01',
'园地': '02',
'林地': '03',
'草地': '04',
'其他地类': '12'
}
# 方法1重命名列后转换为字典
df_encoded = target_df.rename(columns=land_type_mapping)
result_dict = df_encoded.set_index('行政单位').to_dict('index')
return result_dict
# 检查要素类是否存在
if not arcpy.Exists(dltb_class_feature):
print(f"警告:输入要素类不存在: {dltb_class_feature}")
else:
# 转为numpy数组供pandas统计使用
df = pd.DataFrame(arcpy.da.TableToNumPyArray(dltb_class_feature, ["YJDLBM", "TBDLMJ"], skip_nulls=False, null_value=np.nan))
qtdl_df = df[df['YJDLBM'] == '12']
if qtdl_df['TBDLMJ'].isnull().any() or qtdl_df['TBDLMJ'].eq(0).any():
print("警告其他地类TBDLMJ字段 存在空值或无效的记录,将不平差其他地类")
target_areas = {}
else:
area_by_group = df.groupby("YJDLBM")["TBDLMJ"].sum()
for key in area_by_group.keys():
area_by_group[key] = area_by_group[key] * 0.0015
target_areas = area_by_group.to_dict()
# 获取铁山港区的目标面积
gangnan_target = target_df[target_df['行政单位'] == xzqmc]
if gangnan_target.empty:
print(f"警告:未找到{xzqmc}的目标面积数据,将使用TBDLMJ数据进行平差")
return target_areas
# 提取各土地利用类型的目标面积
landuse_types = {'01':'耕地', '02':'园地', '03':'林地', '04':'草地', '12':'其他地类'}
for dlbm, dlmc in landuse_types.items():
if dlmc in gangnan_target.columns:
if gangnan_target[dlmc].values[0] and not np.isnan(gangnan_target[dlmc].values[0]):
target_areas[dlbm] = gangnan_target[dlmc].values[0]
return target_areas
except Exception as e:
print(f"计算面积时出错: {str(e)}")
return {}
# 按地类平差(全区统一平差)
def adjust_area_statistics(stats_df, target_areas):
"""
根据Excel中的目标面积对统计数据进行平差处理
Parameters:
stats_df: 原始统计数据DataFrame
excel_target_path: 包含目标面积的Excel文件路径
Returns:
adjusted_df: 平差后的DataFrame
"""
try:
if target_areas is None:
print("警告:目标面积数据为空,不进行平差")
return stats_df
# 准备平差数据
adjusted_df = stats_df.copy()
if "YJDLBM" not in adjusted_df.columns:
dlbm = "YNDLBM"
else:
dlbm = "YJDLBM"
adjusted_df['adjusted_area'] = adjusted_df['temp_area']
adjusted_df['adjustment_factor'] = 1.0
# 计算每个地类的原始总面积
original_totals = stats_df.groupby(dlbm)['temp_area'].sum().to_dict()
# 对每个地类进行平差
for yjdl, target_area in target_areas.items():
if (yjdl in original_totals and original_totals[yjdl] > 0) or target_area > 0:
adjustment_factor = target_area / original_totals[yjdl]
# 应用平差系数
mask = adjusted_df[dlbm] == yjdl
adjusted_df.loc[mask, 'adjusted_area'] = adjusted_df.loc[mask, 'temp_area'] * adjustment_factor
adjusted_df.loc[mask, 'adjustment_factor'] = adjustment_factor
# 应用误差矫正,确保总和等于目标值
adjusted_areas = adjusted_df.loc[mask, 'adjusted_area'].tolist()
original_areas = stats_df.loc[mask, 'temp_area'].tolist()
corrected_areas = correct_rounding_error(target_area, adjusted_areas, original_areas)
adjusted_df.loc[mask, 'adjusted_area'] = corrected_areas
print(f"地类 {yjdl}: 平差系数 = {adjustment_factor:.6f}, 目标面积 = {target_area}, 矫正后总面积 = {sum(corrected_areas)}")
return adjusted_df
except Exception as e:
print(f"平差处理失败: {e}")
return stats_df
# 按行政区+地类进行平差
def adjust_by_district_landuse(stats_df:pd.DataFrame, target_areas_dict:dict):
"""
按行政区+地类进行平差
Parameters:
stats_df: 原始统计数据DataFrame
target_areas_dict: 目标面积字典,格式:{'行政区': {'地类': 目标面积}}
Returns:
adjusted_df: 平差后的DataFrame
"""
# 复制原始数据
adjusted_df = stats_df.copy()
adjusted_df['adjusted_area'] = adjusted_df['temp_area']
adjusted_df['adjustment_factor'] = 1.0
# 获取所有存在的行政区和地类
existing_districts = adjusted_df['XZQMC'].unique()
# 检查目标字典中的行政区是否存在
missing_districts = []
tt = [td for td in target_areas_dict.keys()]
for ed in existing_districts:
if ed not in tt:
missing_districts.append(ed)
# 如果有行政区不存在,返回原始数据并提示
if missing_districts:
print(f"警告:平差数据中不存在行政区: {missing_districts},未进行平差")
return stats_df
# 计算每个行政区每个地类的原始总面积
original_totals = stats_df.groupby(['XZQMC', 'YJDLBM'])['temp_area'].sum()
# 对每个行政区的每个地类进行平差
for xzqmc, landuse_targets in target_areas_dict.items():
for yjdl, target_area in landuse_targets.items():
# 检查该行政区是否有此地类数据
if (xzqmc, yjdl) in original_totals.index and original_totals.at[(xzqmc, yjdl)] > 0:
adjustment_factor = target_area / original_totals[(xzqmc, yjdl)]
# 应用平差系数
mask = (adjusted_df['XZQMC'] == xzqmc) & (adjusted_df['YJDLBM'] == yjdl)
adjusted_df.loc[mask, 'adjusted_area'] = adjusted_df.loc[mask, 'temp_area'] * adjustment_factor
adjusted_df.loc[mask, 'adjustment_factor'] = adjustment_factor
# 应用误差矫正,确保总和等于目标值
adjusted_areas = adjusted_df.loc[mask, 'adjusted_area'].tolist()
original_areas = stats_df.loc[mask, 'temp_area'].tolist()
corrected_areas = correct_rounding_error(target_area, adjusted_areas, original_areas)
adjusted_df.loc[mask, 'adjusted_area'] = corrected_areas
print(f"{xzqmc} - 地类 {yjdl}: 平差系数 = {adjustment_factor:.6f}, 目标面积 = {target_area}, 矫正后总面积 = {sum(corrected_areas)}")
return adjusted_df
def get_target_areas(excel_path:str, sheet_name:str, xzqmc:str) -> pd.DataFrame:
df_excel = pd.read_excel(excel_path, sheet_name)
target_df = df_excel[df_excel['行政单位'] == xzqmc]
df_area_for_merge = target_df.set_index('行政单位').iloc[0].reset_index(name='面积').rename(columns={'index': 'EJDL'})
return df_area_for_merge
def get_target_areas_by_group(excel_target_path):
# 读取目标面积Excel文件
target_df = pd.read_excel(excel_target_path,"Sheet1")
# 确保列名匹配
target_df.columns = target_df.columns.str.strip()
result_dict = target_df.set_index('行政单位').to_dict('index')
return result_dict