Files
ArcGis_Py/tools/core/raster_to_polygon.py
2026-04-22 12:27:49 +08:00

475 lines
20 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
栅格处理模块: 提供栅格重分类、栅格转矢量和小面积图斑消除功能
设计用于通过 QProcess 调用,接收命令行参数,并通过标准输出返回结果和状态。
"""
import argparse
import json
import os
import random
import sys
import time
import traceback
import arcpy
import uuid
from pathlib import Path
from tools.core.utils.os_utils import temp_files_processor
try:
from utils import common_utils
except ImportError:
print("错误: 未找到 utils 模块。请确保 utils.py 文件存在或已添加到 PYTHONPATH。")
sys.exit(1)
def parse_arguments():
"""解析命令行参数"""
parser = argparse.ArgumentParser(description='处理栅格数据:重分类、转矢量、消除小图斑')
parser.add_argument('--input_raster', required=True, help='输入栅格文件路径')
parser.add_argument('--settings_path', required=True, help='配置文件路径')
args = parser.parse_args()
if args.settings_path:
with open(args.settings_path, 'r', encoding="utf-8") as settings_file:
settings = json.load(settings_file)
raster_settings = settings.get("raster_settings", {})
if raster_settings:
standards_dict_path = raster_settings.get("config_file_path", "")
with open(standards_dict_path, 'r', encoding="utf-8") as standards_file:
standards_dict = json.load(standards_file)
raster_name = Path(args.input_raster).stem
remap_table = common_utils.create_remap_table(standards_dict['export_config'][raster_name]["标准等级"])
raster_settings["remap_table"] = remap_table
raster_settings["input_raster"] = args.input_raster
else:
print("错误: 未找到有效配置文件")
sys.exit(1)
return raster_settings
def print_status(message):
"""
输出状态信息到标准输出,用于 GUI 实时显示
格式: STATUS: <message>
"""
print(f"STATUS:{message}")
sys.stdout.flush() # 确保立即输出
def print_result(success, output_path="", error_message=""):
"""
输出最终结果到标准输出,用于 GUI 判断任务状态和获取结果
格式: RESULT:True|<output_path>|
格式: RESULT:False||<error_message>
"""
if success:
print(f"RESULT:True|{output_path}|")
else:
# 在错误信息中替换换行符,避免干扰解析
cleaned_error_message = error_message.replace('\n', ' ').replace('\r', '')
print(f"RESULT:False||{cleaned_error_message}")
sys.stdout.flush() # 确保立即输出
def log_arcpy_message(message):
"""输出 ArcPy 产生的 geoprocessing 消息"""
# 可以在这里进一步处理或过滤 ArcPy 消息
if message.type == 'Message':
print_status(f"ArcPy消息: {message.message}")
elif message.type == 'Warning':
print_status(f"ArcPy警告: {message.message}")
elif message.type == 'Error':
# 对于错误,也可以记录到标准错误
print_status(f"ArcPy错误: {message.message}")
sys.stderr.write(f"ArcPyError:{message.message}\n")
sys.stderr.flush()
# --- 核心处理函数 ---
def reclassify_raster(input_raster, remap_table, temp_files_to_clean):
"""
根据重分类映射表重分类栅格数据,结果存储在内存中。
参数:
input_raster (str): 输入栅格路径
remap_table (list): 重分类映射表,格式为 [[from, to, new_value], ...]
temp_files_to_clean (list): 用于收集临时文件路径的列表
返回:
str: 内存栅格路径
"""
print_status(f"开始重分类栅格: {input_raster}")
try:
# 确保remap_table中的new_value是整数并确保格式正确
corrected_remap_table = []
for item in remap_table:
try:
if len(item) == 3:
from_value, to_value, new_value = item
# 尝试转换为浮点数以处理范围值
try:
from_value = float(from_value)
except (ValueError, TypeError):
print_status(f"警告: 跳过无效的重分类项起始值: {item[0]}")
continue
if isinstance(to_value, (int, float)) and to_value == float('inf'):
to_value = 10000 # 用一个很大的数值代替
elif isinstance(to_value, (int, float)) and to_value == float('-inf'):
to_value = -10000 # 用一个很小的数值代替
else:
try:
to_value = float(to_value)
except (ValueError, TypeError):
print_status(f"警告: 跳过无效的重分类项结束值: {item[1]}")
continue
try:
new_value = int(new_value)
except (ValueError, TypeError):
print_status(f"警告: 跳过无效的重分类项新值: {item[2]}")
continue
# 验证范围的有效性
if from_value > to_value:
print_status(f"警告: 跳过无效范围: {from_value} > {to_value}")
continue
corrected_remap_table.append([from_value, to_value, new_value])
else:
print_status(f"警告: 跳过无效的重分类项格式: {item}")
except Exception as e: # 捕获更广泛的异常
print_status(f"处理重分类项 {item} 时出错: {e}")
if not corrected_remap_table:
raise ValueError("重分类映射表为空或无效,无法执行重分类。")
# 创建重分类对象
remap = arcpy.sa.RemapRange(corrected_remap_table)
# 执行重分类到内存
# 检查输入栅格是否存在
if not arcpy.Exists(input_raster):
raise FileNotFoundError(f"输入栅格不存在: {input_raster}")
# 设置Snap Raster和Mask环境如果需要的话
# arcpy.env.snapRaster = input_raster
# arcpy.env.mask = input_raster # 如果需要按栅格范围裁剪
out_raster = arcpy.sa.Reclassify(input_raster, "VALUE", remap, "DATA")
# 保存到内存工作空间
temp_reclass_raster_mem = f"in_memory/reclass_int_{uuid.uuid4().hex[:8]}"
# 在返回之前保存到内存,并添加到清理列表
out_raster.save(temp_reclass_raster_mem)
temp_files_to_clean.append(temp_reclass_raster_mem)
print_status(f"重分类已完成: {temp_reclass_raster_mem}")
return temp_reclass_raster_mem
except Exception as e:
print_status(f"重分类过程出错: {str(e)}")
# 记录ArcPy消息
for msg in arcpy.GetMessages(2).split('\n'):
if msg:
print_status(f"ArcPy重分类错误详情: {msg}")
raise # 重新抛出异常
def eliminate_small_polygons(input_polygon, output_polygon, min_area, area_unit, temp_files_to_clean, current_iter=1, max_iterations=8, start_area=1000):
"""
递归消除小于指定面积的多边形,直至没有小图斑或达到最大迭代次数。
参数:
input_polygon (str): 输入多边形要素类路径
output_polygon (str): 最终输出多边形要素类路径
min_area (float): 最小面积阈值
area_unit (str): 面积单位
max_iterations (int): 最大递归次数
current_iter (int): 当前迭代次数
temp_files_to_clean (list): 用于收集临时文件路径的列表
返回:
str: 最终输出多边形要素类路径
"""
# print_status(f"开始第 {current_iter} 次消除小图斑...")
# 检查输入多边形是否存在
if not arcpy.Exists(input_polygon):
raise FileNotFoundError(f"输入多边形不存在: {input_polygon}")
try:
# 如果是第一次迭代,检查输出文件是否已存在,并删除
if current_iter == 1 and arcpy.Exists(output_polygon):
try:
arcpy.management.Delete(output_polygon)
print_status(f"已删除现有输出文件: {output_polygon}")
except Exception as delete_err:
print_status(f"警告: 无法删除现有输出文件 {output_polygon}: {str(delete_err)}")
#==================增加逐面积消除================
# 计算当前迭代的面积阈值
current_threshold = start_area * (2 ** (current_iter - 1))
# 如果当前阈值超过最终阈值,使用最终阈值
if current_threshold > min_area:
current_threshold = min_area
#==================增加逐面积消除================
# 创建临时内存图层以便选择
temp_layer = f"input_lyr_{uuid.uuid4().hex[:8]}"
arcpy.management.MakeFeatureLayer(input_polygon, temp_layer)[0]
# print_status(f"已创建临时图层: {temp_layer_name}")
# 添加或检查面积字段
area_field_name = "TEMP_AREA"
fields = [f.name for f in arcpy.ListFields(temp_layer)]
if area_field_name not in fields:
arcpy.management.AddField(temp_layer, area_field_name, "DOUBLE")
# print_status(f"已添加临时面积字段: {area_field_name}")
# 计算面积
arcpy.management.CalculateGeometryAttributes(
temp_layer,
[[area_field_name, "AREA"]],
None,
area_unit,
None,
"SAME_AS_INPUT"
)
# print_status("面积计算完成.")
# 选择小于阈值的多边形
selection_query = f"{arcpy.AddFieldDelimiters(temp_layer, area_field_name)} < {current_threshold}"
# print_status(f"选择查询: {selection_query}")
arcpy.management.SelectLayerByAttribute(temp_layer, "NEW_SELECTION", selection_query)
# 检查选中的要素数量
count = int(arcpy.management.GetCount(temp_layer).getOutput(0))
# print_status(f"发现 {count} 个小于 {min_area} {area_unit} 的小图斑.")
# 判断是否停止迭代
if count == 0 or current_iter >= max_iterations:
# print_status(f"复制最终结果到: {output_polygon}")
arcpy.management.CopyFeatures(input_polygon, output_polygon)
# 删除临时面积字段 (可选,如果需要保持输出干净)
# if area_field_name in fields: # 仅在字段是我们添加的情况下删除
# arcpy.management.DeleteField(output_polygon, area_field_name)
return output_polygon
else:
# print_status(f"执行消除操作...")
temp_eliminate_output = f"in_memory/temp_eliminate_{uuid.uuid4().hex[:8]}"
temp_files_to_clean.append(temp_eliminate_output) # 添加到临时文件列表
# 执行消除操作
arcpy.Eliminate_management(temp_layer, temp_eliminate_output, "LENGTH")
# print_status(f"执行融合操作...")
temp_dissolve_output = f"in_memory/dissolve_polygons_{uuid.uuid4().hex[:8]}"
temp_files_to_clean.append(temp_dissolve_output)
# 添加融合字段
dissolve_fields = ["gridcode"]
arcpy.management.Dissolve(temp_eliminate_output, temp_dissolve_output, dissolve_fields, multi_part="SINGLE_PART")
# print_status(f"融合结果已保存到内存: {temp_dissolve_output}")
# 递归调用,使用融合后的结果作为下一次迭代的输入
return eliminate_small_polygons(
temp_dissolve_output, # 下一次迭代使用临时融合输出作为输入
output_polygon,
min_area,
area_unit,
temp_files_to_clean,
current_iter + 1,
max_iterations,
start_area
)
except Exception as e:
print_status(f"消除小面积多边形过程出错 (迭代 {current_iter}): {str(e)}")
# 记录ArcPy消息
for msg in arcpy.GetMessages(2).split('\n'):
if msg:
print_status(f"ArcPy消除错误详情: {msg}")
raise # 重新抛出异常
finally:
# 确保删除临时图层,即使出错
if 'temp_layer' in locals() and arcpy.Exists(temp_layer):
try:
arcpy.management.Delete(temp_layer)
# print_status(f"已删除临时图层: {temp_layer.name}")
except Exception as delete_layer_err:
print_status(f"警告: 无法删除临时图层 {temp_layer}: {str(delete_layer_err)}")
# --- 主处理逻辑 ---
def main():
"""主函数:解析参数,执行处理流程,输出结果和状态"""
params = None
temp_files_to_clean = []
original_workspace = None
try:
# 1. 解析参数
params = parse_arguments()
input_raster = params["input_raster"]
raster_name = Path(input_raster).stem
input_folder = params["input_folder"]
output_folder = params["batch_output_folder"]
clip_features = params["clip_features"]
clip_enabled = params["clip_enabled"]
remap_table = params["remap_table"]
min_area = params["min_area"]
area_unit = params["area_unit"]
simplify = params["simplify"]
# print_status(f"解析参数完成: {params}")
# 2. 设置工作空间和环境
original_workspace = arcpy.env.workspace
arcpy.env.workspace = input_folder
arcpy.env.overwriteOutput = True
print_status(f"ArcPy 工作空间设置为: {arcpy.env.workspace}")
# 3. 校验输入/输出路径
if not arcpy.Exists(input_raster):
raise FileNotFoundError(f"输入栅格文件不存在: {input_raster}")
# 创建输出文件夹
if not os.path.exists(output_folder):
os.makedirs(output_folder)
print_status(f"已创建输出文件夹: {output_folder}")
# 创建面积统计用文件夹
disk_output_path = os.path.join(output_folder, "面积统计用栅格面")
time.sleep(random.random())
if not os.path.exists(disk_output_path):
os.makedirs(disk_output_path)
print_status(f"已创建面积统计文件夹: {disk_output_path}")
if clip_enabled and not arcpy.Exists(clip_features):
raise FileNotFoundError(f"裁剪要素类不存在: {clip_features}")
# 4. 定义中间和最终输出路径
# 根据输出文件夹类型确定文件扩展名和命名方式
output_is_workspace = common_utils.get_data_type(output_folder) in ["Workspace", "FeatureDataset", "Geodatabase"]
if output_is_workspace:
# 输出到地理数据库或要素数据集
final_output_path = os.path.join(os.path.dirname(output_folder), f"{raster_name}_eliminate.shp")
# final_output_path = f"{raster_name}_processed"
else:
# 输出到文件夹 (例如 Shapefile)
final_output_path = os.path.join(output_folder, f"{raster_name}_eliminate.shp")
# final_output_path = f"{raster_name}_processed.shp"
# 5. 执行重分类 (如果 remap_table 存在)
if remap_table:
reclassed_raster = reclassify_raster(input_raster, remap_table, temp_files_to_clean)
arcpy.Raster(reclassed_raster).save(os.path.join(output_folder,f"{Path(input_raster).stem}分级后.tif"))
# 6. 栅格转多边形
print_status(f"开始栅格转多边形: {reclassed_raster}")
temp_polygon_output = os.path.join("in_memory", f"raster_to_polygon_{uuid.uuid4().hex[:8]}")
temp_files_to_clean.append(temp_polygon_output)
simplify_value = "SIMPLIFY" if simplify else "NO_SIMPLIFY"
arcpy.conversion.RasterToPolygon(reclassed_raster, temp_polygon_output, simplify_value, "VALUE")
print_status(f"栅格转多边形完成,结果在内存: {temp_polygon_output}")
# 将内存中的要素类保存到硬盘
disk_output_polygon = os.path.join(disk_output_path, f"{raster_name}_reclassed_polygon.shp")
arcpy.CopyFeatures_management(temp_polygon_output, disk_output_polygon)
print_status(f"已将重分类转面结果保存到硬盘: {disk_output_polygon}")
# 7. 裁剪 (如果 clip_features 存在)
current_polygon_source = temp_polygon_output
if clip_enabled:
print_status(f"开始裁剪要素类: {current_polygon_source} using {clip_features}")
temp_cliped_polygon_output = os.path.join("in_memory", f"cliped_{uuid.uuid4().hex[:8]}")
temp_files_to_clean.append(temp_cliped_polygon_output)
arcpy.analysis.Clip(current_polygon_source, clip_features, temp_cliped_polygon_output)
current_polygon_source = temp_cliped_polygon_output
print_status(f"裁剪完成,结果在内存: {current_polygon_source}")
# 多部件至单部件 (通常在裁剪后进行,确保每个要素都是独立的单部件)
print_status(f"开始多部件至单部件转换: {current_polygon_source}")
temp_multi_to_single_output = os.path.join("in_memory", f"single_{uuid.uuid4().hex[:8]}")
temp_files_to_clean.append(temp_multi_to_single_output)
arcpy.management.MultipartToSinglepart(current_polygon_source, temp_multi_to_single_output)
current_polygon_source = temp_multi_to_single_output
print_status(f"多部件至单部件转换完成,结果在内存: {current_polygon_source}")
# 8. 消除小面积图斑
print_status(f"开始消除小面积图斑: {current_polygon_source} (阈值: {min_area} {area_unit})")
eliminate_small_polygons(
current_polygon_source,
final_output_path, # 直接传递最终输出路径
min_area,
area_unit,
temp_files_to_clean # 传递临时文件列表
)
print_status("消除小面积图斑完成.")
# 9. 最终清理和输出结果
print_status("处理流程全部完成.")
# 清理在内存或临时位置生成的中间文件
temp_files_processor.clean_up_temp_files(temp_files=temp_files_to_clean, workspace=original_workspace)
# 验证最终输出文件是否存在
if arcpy.Exists(final_output_path):
print_result(True, final_output_path, "")
else:
raise Exception(f"处理完成,但最终输出文件 {final_output_path} 不存在。")
except FileNotFoundError as fnf_e:
error_msg = f"文件不存在错误: {str(fnf_e)}"
print_status(error_msg)
print_result(False, "", error_msg)
except ValueError as ve:
error_msg = f"参数错误或数据校验失败: {str(ve)}"
print_status(error_msg)
print_result(False, "", error_msg)
except arcpy.ExecuteError:
# 捕获 ArcPy 执行错误
error_msg = f"ArcPy 执行错误: {arcpy.GetMessages(2)}"
print_status(error_msg)
sys.stderr.write(f"ArcPyExecuteError:{arcpy.GetMessages(2)}\n") # 记录到标准错误
print_result(False, "", error_msg)
except Exception as e:
# 捕获其他未预料的错误
error_msg = f"发生未预料的错误: {str(e)}\n{traceback.format_exc()}"
print_status(error_msg)
sys.stderr.write(f"UnexpectedError:{error_msg}\n") # 记录到标准错误
print_result(False, "", error_msg)
finally:
# 确保在任何情况下都尝试清理(尽管在 except 块中也调用了)
# 这里的调用是最后的保障,如果 except 块中的清理失败了
print_status("脚本结束,执行最终清理...")
temp_files_processor.clean_up_temp_files(temp_files=temp_files_to_clean, workspace=original_workspace)
print_status("最终清理完成.")
sys.exit(0) # 正常退出脚本
if __name__ == "__main__":
print_status("脚本开始执行...")
main()