291 lines
9.8 KiB
Python
291 lines
9.8 KiB
Python
import arcpy
|
||
import numpy as np
|
||
|
||
|
||
def get_data_type(data_path):
|
||
"""获取数据类型
|
||
|
||
:param data_path: 数据路径
|
||
:return: 数据类型
|
||
"""
|
||
if arcpy.Exists(data_path):
|
||
try:
|
||
desc = arcpy.Describe(data_path)
|
||
return desc.dataType
|
||
except:
|
||
return False
|
||
else:
|
||
return False
|
||
|
||
def get_config_key(every_string: str) -> str:
|
||
config_dict = {
|
||
"AB": "有效硼","ACU": "有效铜","AMN": "有效锰","AMO": "有效钼","AS1": "有效硫","AZN": "有效锌","CEC": "阳离子交换量","ECA": "交换性钙",
|
||
"EMG": "交换性镁","TSE": "全硒","TN": "全氮","TP": "全磷","TK": "全钾","AFE": "有效铁","AK": "速效钾","AP": "有效磷", "TRRZ": "土壤容重","LSFD":"砾石丰度",
|
||
"OM": "有机质","FL": "粉粒含量","NL": "黏粒含量","SL": "砂粒含量","PH": "土壤 pH","YXTCHD": "有效土层厚度","GZCHD": "耕作层厚度","TRZD": "土壤质地","TRZD12": "土壤质地",
|
||
"三普PH": "三普PH","二普PH": "二普PH","测土PH": "测土PH","二普-三普": "二普-三普","测土-三普": "测土-三普","二普-测土": "二普-测土"
|
||
}
|
||
try:
|
||
for key in config_dict.keys():
|
||
in_key = every_string.split("_")[0]
|
||
if key == in_key:
|
||
return key
|
||
|
||
return ""
|
||
|
||
except Exception as e:
|
||
return ""
|
||
|
||
def parse_raster_standard(standard_str):
|
||
"""解析重分类标准字符串,返回数值范围
|
||
|
||
例如:
|
||
">2.00" -> (2.0, float('inf'))
|
||
"1.00~2.00" -> (1.0, 2.0)
|
||
"≤0.20" -> (0, 0.2)
|
||
"""
|
||
if "," in standard_str:
|
||
temp = []
|
||
parts = standard_str.split(",\n")
|
||
for part in parts:
|
||
temp_part = parse_raster_standard(part)
|
||
temp.append(temp_part)
|
||
return temp
|
||
if ">" in standard_str:
|
||
value = float(standard_str.replace(">", ""))
|
||
return (value, float('inf'))
|
||
elif "~" in standard_str:
|
||
parts = standard_str.split("~")
|
||
return (float(parts[0]), float(parts[1]))
|
||
elif "≤" in standard_str:
|
||
value = float(standard_str.replace("≤", ""))
|
||
return (0, value)
|
||
else:
|
||
# 尝试直接解析为数值
|
||
try:
|
||
value = float(standard_str)
|
||
return (value, value)
|
||
except ValueError:
|
||
return None
|
||
|
||
def create_remap_table(standards_dict):
|
||
"""根据标准配置创建重分类映射表
|
||
|
||
参数:
|
||
standards_config -- 标准配置,格式为:
|
||
{"标准1":5-6, "标准2":7-8, ...}
|
||
remap_values -- 重分类值数组,默认为从1开始的整数序列
|
||
|
||
返回:
|
||
重分类映射表,格式为 [[old_min, old_max, new_value], ...]
|
||
"""
|
||
|
||
# 确保我们有一个有效的标准列表
|
||
if not standards_dict or not isinstance(standards_dict, dict):
|
||
print("警告: 没有有效的标准数据")
|
||
return []
|
||
|
||
# 设置重分类值
|
||
standards_length = len(standards_dict)
|
||
remap_values = list(range(1, 2*standards_length + 1))
|
||
|
||
remap_table = []
|
||
for i, (key, value) in enumerate(standards_dict.items()):
|
||
range_tuple = parse_raster_standard(value)
|
||
|
||
if range_tuple:
|
||
if type(range_tuple) is list:
|
||
m = 0
|
||
for range_tuple_item in range_tuple:
|
||
j = m * standards_length + i
|
||
remap_table.append([range_tuple_item[0], range_tuple_item[1], remap_values[j]])
|
||
m = m + 1
|
||
else:
|
||
remap_table.append([range_tuple[0], range_tuple[1], remap_values[i]])
|
||
|
||
return remap_table
|
||
|
||
|
||
def check_fields_exist_describe(feature_class, field_names):
|
||
"""
|
||
使用Describe函数检查要素类中字段是否存在
|
||
"""
|
||
try:
|
||
desc = arcpy.Describe(feature_class)
|
||
existing_fields = [field.name for field in desc.fields]
|
||
|
||
for field_name in field_names:
|
||
if field_name not in existing_fields:
|
||
return False
|
||
|
||
return True
|
||
|
||
except Exception as e:
|
||
print(f"检查字段时出错: {e}")
|
||
return None
|
||
|
||
def get_grade_by_standard(value, grade_standards):
|
||
"""
|
||
通用的等级判断函数
|
||
value: 数值
|
||
grade_standards: 分级标准字典,如 {"等级一": ">2.00", "等级二": "1.00~2.00"}
|
||
"""
|
||
if value is None:
|
||
return "无数据"
|
||
|
||
# 按等级顺序检查(从高到低)
|
||
sorted_grades = sorted(grade_standards.items(),
|
||
key=lambda x: list(grade_standards.keys()).index(x[0]))
|
||
|
||
for grade_name, grade_standard in sorted_grades:
|
||
if is_value_in_grade(value, grade_standard):
|
||
return grade_name
|
||
|
||
return "超出范围"
|
||
|
||
def is_value_in_grade(value, grade_standard):
|
||
"""
|
||
判断数值是否在分级标准范围内
|
||
"""
|
||
# 处理特殊字符
|
||
grade_standard = grade_standard.replace('>', '>').replace('≤', '<=').replace('~', '~')
|
||
|
||
# 处理多范围情况(如pH值)
|
||
if ',' in grade_standard:
|
||
ranges = grade_standard.split(',')
|
||
for range_str in ranges:
|
||
if is_value_in_single_range(value, range_str.strip()):
|
||
return True
|
||
return False
|
||
else:
|
||
return is_value_in_single_range(value, grade_standard)
|
||
|
||
def is_value_in_single_range(value, range_str):
|
||
"""
|
||
判断数值是否在单个范围内
|
||
"""
|
||
import re
|
||
|
||
# 提取数值
|
||
numbers = re.findall(r'[-+]?\d*\.\d+|\d+', range_str)
|
||
numbers = [float(num) for num in numbers]
|
||
|
||
if '>' in range_str and '~' in range_str:
|
||
# 格式:>下限~上限
|
||
return numbers[0] < value <= numbers[1]
|
||
elif '>' in range_str:
|
||
# 格式:>数值
|
||
return value > numbers[0]
|
||
elif '<=' in range_str:
|
||
# 格式:<=数值
|
||
return value <= numbers[0]
|
||
elif '~' in range_str:
|
||
# 格式:下限~上限
|
||
return numbers[0] < value <= numbers[1]
|
||
else:
|
||
# 无法解析,使用字符串匹配
|
||
return str(value) == range_str
|
||
|
||
def vectorized_grade_assignment(values, grade_standards):
|
||
"""
|
||
向量化的等级分配(性能更好)
|
||
"""
|
||
# 确保输入值是数值类型,如果是字符串则转换为浮点数
|
||
if isinstance(values, np.ndarray) and values.dtype.kind in 'OUS': # 字符串类型
|
||
values = values.astype(float)
|
||
elif hasattr(values, 'dtype') and values.dtype == object: # 对象类型,可能包含字符串
|
||
values = values.astype(float)
|
||
|
||
conditions = []
|
||
choices = []
|
||
|
||
# 按等级顺序构建条件
|
||
# 创建两个列表来分别存储上段和下段范围
|
||
upper_ranges = []
|
||
lower_ranges = []
|
||
|
||
# 遍历排序后的等级
|
||
for i, (level, ranges) in enumerate(sorted(grade_standards.items(), key=lambda x: list(grade_standards.keys()).index(x[0])), 1):
|
||
# 分割范围字符串
|
||
range_list = [r.strip() for r in ranges.split(',')]
|
||
|
||
if len(range_list) >= 1:
|
||
upper_ranges.append((i, range_list[0]))
|
||
|
||
if len(range_list) >= 2:
|
||
# 计算下段范围的索引(原始索引 + 等级总数)
|
||
lower_index = i + len(grade_standards)
|
||
lower_ranges.append((lower_index, range_list[1]))
|
||
|
||
# 合并结果
|
||
sorted_grades = upper_ranges + lower_ranges
|
||
# sorted_grades = sorted(grade_standards.items(), key=lambda x: list(grade_standards.keys()).index(x[0]))
|
||
|
||
for grade_name, grade_standard in sorted_grades:
|
||
condition = create_condition(values, grade_standard)
|
||
conditions.append(condition)
|
||
choices.append(grade_name)
|
||
|
||
# 使用np.select进行向量化操作
|
||
result = np.select(conditions, choices, default="超出范围")
|
||
return result
|
||
|
||
def create_condition(values, grade_standard):
|
||
"""
|
||
创建numpy条件
|
||
"""
|
||
# 清理字符串:替换特殊字符并移除换行符和空格
|
||
grade_standard = (grade_standard.replace('>', '>')
|
||
.replace('≤', '<=')
|
||
.replace('~', '~')
|
||
.replace('\n', '') # 移除换行符
|
||
.replace(' ', '')) # 移除空格
|
||
|
||
if ',' in grade_standard:
|
||
# 多范围处理
|
||
ranges = grade_standard.split(',')
|
||
condition = None
|
||
for range_str in ranges:
|
||
if range_str: # 确保不是空字符串
|
||
range_condition = create_single_condition(values, range_str.strip())
|
||
if condition is None:
|
||
condition = range_condition
|
||
else:
|
||
condition = condition | range_condition
|
||
return condition
|
||
else:
|
||
return create_single_condition(values, grade_standard)
|
||
|
||
def create_single_condition(values, range_str):
|
||
"""
|
||
创建单个范围的条件
|
||
"""
|
||
import re
|
||
|
||
# 调试输出,帮助排查问题
|
||
# print(f"处理范围字符串: '{range_str}'")
|
||
|
||
# 提取数字
|
||
numbers = re.findall(r'[-+]?\d*\.\d+|\d+', range_str)
|
||
numbers = [float(num) for num in numbers]
|
||
|
||
if not numbers:
|
||
raise ValueError(f"无法从字符串 '{range_str}' 中提取数字")
|
||
|
||
# 根据范围符号创建条件
|
||
if '>' in range_str and '<=' in range_str:
|
||
# 处理 >x<=y 的情况(虽然不常见)
|
||
return (values > numbers[0]) & (values <= numbers[1])
|
||
elif '>' in range_str and '~' in range_str:
|
||
return (values > numbers[0]) & (values <= numbers[1])
|
||
elif '>' in range_str:
|
||
return values > numbers[0]
|
||
elif '<=' in range_str:
|
||
return values <= numbers[0]
|
||
elif '~' in range_str:
|
||
return (values > numbers[0]) & (values <= numbers[1])
|
||
else:
|
||
# 如果是单个数字
|
||
try:
|
||
return values == float(range_str)
|
||
except ValueError:
|
||
raise ValueError(f"无法解析的范围字符串: '{range_str}'") |