# -*- coding: utf-8 -*- """ 栅格处理模块: 提供栅格重分类、栅格转矢量和小面积图斑消除功能 设计用于通过 QProcess 调用,接收命令行参数,并通过标准输出返回结果和状态。 """ import argparse import json import os import random import sys import time import traceback import arcpy import uuid from pathlib import Path from tools.core.utils.os_utils import temp_files_processor try: from utils import common_utils except ImportError: print("错误: 未找到 utils 模块。请确保 utils.py 文件存在或已添加到 PYTHONPATH。") sys.exit(1) def parse_arguments(): """解析命令行参数""" parser = argparse.ArgumentParser(description='处理栅格数据:重分类、转矢量、消除小图斑') parser.add_argument('--input_raster', required=True, help='输入栅格文件路径') parser.add_argument('--settings_path', required=True, help='配置文件路径') args = parser.parse_args() if args.settings_path: with open(args.settings_path, 'r', encoding="utf-8") as settings_file: settings = json.load(settings_file) raster_settings = settings.get("raster_settings", {}) if raster_settings: standards_dict_path = raster_settings.get("config_file_path", "") with open(standards_dict_path, 'r', encoding="utf-8") as standards_file: standards_dict = json.load(standards_file) raster_name = Path(args.input_raster).stem remap_table = common_utils.create_remap_table(standards_dict['export_config'][raster_name]["标准等级"]) raster_settings["remap_table"] = remap_table raster_settings["input_raster"] = args.input_raster else: print("错误: 未找到有效配置文件") sys.exit(1) return raster_settings def print_status(message): """ 输出状态信息到标准输出,用于 GUI 实时显示 格式: STATUS: """ print(f"STATUS:{message}") sys.stdout.flush() # 确保立即输出 def print_result(success, output_path="", error_message=""): """ 输出最终结果到标准输出,用于 GUI 判断任务状态和获取结果 格式: RESULT:True|| 格式: RESULT:False|| """ if success: print(f"RESULT:True|{output_path}|") else: # 在错误信息中替换换行符,避免干扰解析 cleaned_error_message = error_message.replace('\n', ' ').replace('\r', '') print(f"RESULT:False||{cleaned_error_message}") sys.stdout.flush() # 确保立即输出 def log_arcpy_message(message): """输出 ArcPy 产生的 geoprocessing 消息""" # 可以在这里进一步处理或过滤 ArcPy 消息 if message.type == 'Message': print_status(f"ArcPy消息: {message.message}") elif message.type == 'Warning': print_status(f"ArcPy警告: {message.message}") elif message.type == 'Error': # 对于错误,也可以记录到标准错误 print_status(f"ArcPy错误: {message.message}") sys.stderr.write(f"ArcPyError:{message.message}\n") sys.stderr.flush() # --- 核心处理函数 --- def reclassify_raster(input_raster, remap_table, temp_files_to_clean): """ 根据重分类映射表重分类栅格数据,结果存储在内存中。 参数: input_raster (str): 输入栅格路径 remap_table (list): 重分类映射表,格式为 [[from, to, new_value], ...] temp_files_to_clean (list): 用于收集临时文件路径的列表 返回: str: 内存栅格路径 """ print_status(f"开始重分类栅格: {input_raster}") try: # 确保remap_table中的new_value是整数,并确保格式正确 corrected_remap_table = [] for item in remap_table: try: if len(item) == 3: from_value, to_value, new_value = item # 尝试转换为浮点数以处理范围值 try: from_value = float(from_value) except (ValueError, TypeError): print_status(f"警告: 跳过无效的重分类项起始值: {item[0]}") continue if isinstance(to_value, (int, float)) and to_value == float('inf'): to_value = 10000 # 用一个很大的数值代替 elif isinstance(to_value, (int, float)) and to_value == float('-inf'): to_value = -10000 # 用一个很小的数值代替 else: try: to_value = float(to_value) except (ValueError, TypeError): print_status(f"警告: 跳过无效的重分类项结束值: {item[1]}") continue try: new_value = int(new_value) except (ValueError, TypeError): print_status(f"警告: 跳过无效的重分类项新值: {item[2]}") continue # 验证范围的有效性 if from_value > to_value: print_status(f"警告: 跳过无效范围: {from_value} > {to_value}") continue corrected_remap_table.append([from_value, to_value, new_value]) else: print_status(f"警告: 跳过无效的重分类项格式: {item}") except Exception as e: # 捕获更广泛的异常 print_status(f"处理重分类项 {item} 时出错: {e}") if not corrected_remap_table: raise ValueError("重分类映射表为空或无效,无法执行重分类。") # 创建重分类对象 remap = arcpy.sa.RemapRange(corrected_remap_table) # 执行重分类到内存 # 检查输入栅格是否存在 if not arcpy.Exists(input_raster): raise FileNotFoundError(f"输入栅格不存在: {input_raster}") # 设置Snap Raster和Mask环境,如果需要的话 # arcpy.env.snapRaster = input_raster # arcpy.env.mask = input_raster # 如果需要按栅格范围裁剪 out_raster = arcpy.sa.Reclassify(input_raster, "VALUE", remap, "DATA") # 保存到内存工作空间 temp_reclass_raster_mem = f"in_memory/reclass_int_{uuid.uuid4().hex[:8]}" # 在返回之前保存到内存,并添加到清理列表 out_raster.save(temp_reclass_raster_mem) temp_files_to_clean.append(temp_reclass_raster_mem) print_status(f"重分类已完成: {temp_reclass_raster_mem}") return temp_reclass_raster_mem except Exception as e: print_status(f"重分类过程出错: {str(e)}") # 记录ArcPy消息 for msg in arcpy.GetMessages(2).split('\n'): if msg: print_status(f"ArcPy重分类错误详情: {msg}") raise # 重新抛出异常 def eliminate_small_polygons(input_polygon, output_polygon, min_area, area_unit, temp_files_to_clean, current_iter=1, max_iterations=8, start_area=1000): """ 递归消除小于指定面积的多边形,直至没有小图斑或达到最大迭代次数。 参数: input_polygon (str): 输入多边形要素类路径 output_polygon (str): 最终输出多边形要素类路径 min_area (float): 最小面积阈值 area_unit (str): 面积单位 max_iterations (int): 最大递归次数 current_iter (int): 当前迭代次数 temp_files_to_clean (list): 用于收集临时文件路径的列表 返回: str: 最终输出多边形要素类路径 """ # print_status(f"开始第 {current_iter} 次消除小图斑...") # 检查输入多边形是否存在 if not arcpy.Exists(input_polygon): raise FileNotFoundError(f"输入多边形不存在: {input_polygon}") try: # 如果是第一次迭代,检查输出文件是否已存在,并删除 if current_iter == 1 and arcpy.Exists(output_polygon): try: arcpy.management.Delete(output_polygon) print_status(f"已删除现有输出文件: {output_polygon}") except Exception as delete_err: print_status(f"警告: 无法删除现有输出文件 {output_polygon}: {str(delete_err)}") #==================增加逐面积消除================ # 计算当前迭代的面积阈值 current_threshold = start_area * (2 ** (current_iter - 1)) # 如果当前阈值超过最终阈值,使用最终阈值 if current_threshold > min_area: current_threshold = min_area #==================增加逐面积消除================ # 创建临时内存图层以便选择 temp_layer = f"input_lyr_{uuid.uuid4().hex[:8]}" arcpy.management.MakeFeatureLayer(input_polygon, temp_layer)[0] # print_status(f"已创建临时图层: {temp_layer_name}") # 添加或检查面积字段 area_field_name = "TEMP_AREA" fields = [f.name for f in arcpy.ListFields(temp_layer)] if area_field_name not in fields: arcpy.management.AddField(temp_layer, area_field_name, "DOUBLE") # print_status(f"已添加临时面积字段: {area_field_name}") # 计算面积 arcpy.management.CalculateGeometryAttributes( temp_layer, [[area_field_name, "AREA"]], None, area_unit, None, "SAME_AS_INPUT" ) # print_status("面积计算完成.") # 选择小于阈值的多边形 selection_query = f"{arcpy.AddFieldDelimiters(temp_layer, area_field_name)} < {current_threshold}" # print_status(f"选择查询: {selection_query}") arcpy.management.SelectLayerByAttribute(temp_layer, "NEW_SELECTION", selection_query) # 检查选中的要素数量 count = int(arcpy.management.GetCount(temp_layer).getOutput(0)) # print_status(f"发现 {count} 个小于 {min_area} {area_unit} 的小图斑.") # 判断是否停止迭代 if count == 0 or current_iter >= max_iterations: # print_status(f"复制最终结果到: {output_polygon}") arcpy.management.CopyFeatures(input_polygon, output_polygon) # 删除临时面积字段 (可选,如果需要保持输出干净) # if area_field_name in fields: # 仅在字段是我们添加的情况下删除 # arcpy.management.DeleteField(output_polygon, area_field_name) return output_polygon else: # print_status(f"执行消除操作...") temp_eliminate_output = f"in_memory/temp_eliminate_{uuid.uuid4().hex[:8]}" temp_files_to_clean.append(temp_eliminate_output) # 添加到临时文件列表 # 执行消除操作 arcpy.Eliminate_management(temp_layer, temp_eliminate_output, "LENGTH") # print_status(f"执行融合操作...") temp_dissolve_output = f"in_memory/dissolve_polygons_{uuid.uuid4().hex[:8]}" temp_files_to_clean.append(temp_dissolve_output) # 添加融合字段 dissolve_fields = ["gridcode"] arcpy.management.Dissolve(temp_eliminate_output, temp_dissolve_output, dissolve_fields, multi_part="SINGLE_PART") # print_status(f"融合结果已保存到内存: {temp_dissolve_output}") # 递归调用,使用融合后的结果作为下一次迭代的输入 return eliminate_small_polygons( temp_dissolve_output, # 下一次迭代使用临时融合输出作为输入 output_polygon, min_area, area_unit, temp_files_to_clean, current_iter + 1, max_iterations, start_area ) except Exception as e: print_status(f"消除小面积多边形过程出错 (迭代 {current_iter}): {str(e)}") # 记录ArcPy消息 for msg in arcpy.GetMessages(2).split('\n'): if msg: print_status(f"ArcPy消除错误详情: {msg}") raise # 重新抛出异常 finally: # 确保删除临时图层,即使出错 if 'temp_layer' in locals() and arcpy.Exists(temp_layer): try: arcpy.management.Delete(temp_layer) # print_status(f"已删除临时图层: {temp_layer.name}") except Exception as delete_layer_err: print_status(f"警告: 无法删除临时图层 {temp_layer}: {str(delete_layer_err)}") # --- 主处理逻辑 --- def main(): """主函数:解析参数,执行处理流程,输出结果和状态""" params = None temp_files_to_clean = [] original_workspace = None try: # 1. 解析参数 params = parse_arguments() input_raster = params["input_raster"] raster_name = Path(input_raster).stem input_folder = params["input_folder"] output_folder = params["batch_output_folder"] clip_features = params["clip_features"] clip_enabled = params["clip_enabled"] remap_table = params["remap_table"] min_area = params["min_area"] area_unit = params["area_unit"] simplify = params["simplify"] # print_status(f"解析参数完成: {params}") # 2. 设置工作空间和环境 original_workspace = arcpy.env.workspace arcpy.env.workspace = input_folder arcpy.env.overwriteOutput = True print_status(f"ArcPy 工作空间设置为: {arcpy.env.workspace}") # 3. 校验输入/输出路径 if not arcpy.Exists(input_raster): raise FileNotFoundError(f"输入栅格文件不存在: {input_raster}") # 创建输出文件夹 if not os.path.exists(output_folder): os.makedirs(output_folder) print_status(f"已创建输出文件夹: {output_folder}") # 创建面积统计用文件夹 disk_output_path = os.path.join(output_folder, "面积统计用栅格面") time.sleep(random.random()) if not os.path.exists(disk_output_path): os.makedirs(disk_output_path) print_status(f"已创建面积统计文件夹: {disk_output_path}") if clip_enabled and not arcpy.Exists(clip_features): raise FileNotFoundError(f"裁剪要素类不存在: {clip_features}") # 4. 定义中间和最终输出路径 # 根据输出文件夹类型确定文件扩展名和命名方式 output_is_workspace = common_utils.get_data_type(output_folder) in ["Workspace", "FeatureDataset", "Geodatabase"] if output_is_workspace: # 输出到地理数据库或要素数据集 final_output_path = os.path.join(os.path.dirname(output_folder), f"{raster_name}_eliminate.shp") # final_output_path = f"{raster_name}_processed" else: # 输出到文件夹 (例如 Shapefile) final_output_path = os.path.join(output_folder, f"{raster_name}_eliminate.shp") # final_output_path = f"{raster_name}_processed.shp" # 5. 执行重分类 (如果 remap_table 存在) if remap_table: reclassed_raster = reclassify_raster(input_raster, remap_table, temp_files_to_clean) arcpy.Raster(reclassed_raster).save(os.path.join(output_folder,f"{Path(input_raster).stem}分级后.tif")) # 6. 栅格转多边形 print_status(f"开始栅格转多边形: {reclassed_raster}") temp_polygon_output = os.path.join("in_memory", f"raster_to_polygon_{uuid.uuid4().hex[:8]}") temp_files_to_clean.append(temp_polygon_output) simplify_value = "SIMPLIFY" if simplify else "NO_SIMPLIFY" arcpy.conversion.RasterToPolygon(reclassed_raster, temp_polygon_output, simplify_value, "VALUE") print_status(f"栅格转多边形完成,结果在内存: {temp_polygon_output}") # 将内存中的要素类保存到硬盘 disk_output_polygon = os.path.join(disk_output_path, f"{raster_name}_reclassed_polygon.shp") arcpy.CopyFeatures_management(temp_polygon_output, disk_output_polygon) print_status(f"已将重分类转面结果保存到硬盘: {disk_output_polygon}") # 7. 裁剪 (如果 clip_features 存在) current_polygon_source = temp_polygon_output if clip_enabled: print_status(f"开始裁剪要素类: {current_polygon_source} using {clip_features}") temp_cliped_polygon_output = os.path.join("in_memory", f"cliped_{uuid.uuid4().hex[:8]}") temp_files_to_clean.append(temp_cliped_polygon_output) arcpy.analysis.Clip(current_polygon_source, clip_features, temp_cliped_polygon_output) current_polygon_source = temp_cliped_polygon_output print_status(f"裁剪完成,结果在内存: {current_polygon_source}") # 多部件至单部件 (通常在裁剪后进行,确保每个要素都是独立的单部件) print_status(f"开始多部件至单部件转换: {current_polygon_source}") temp_multi_to_single_output = os.path.join("in_memory", f"single_{uuid.uuid4().hex[:8]}") temp_files_to_clean.append(temp_multi_to_single_output) arcpy.management.MultipartToSinglepart(current_polygon_source, temp_multi_to_single_output) current_polygon_source = temp_multi_to_single_output print_status(f"多部件至单部件转换完成,结果在内存: {current_polygon_source}") # 8. 消除小面积图斑 print_status(f"开始消除小面积图斑: {current_polygon_source} (阈值: {min_area} {area_unit})") eliminate_small_polygons( current_polygon_source, final_output_path, # 直接传递最终输出路径 min_area, area_unit, temp_files_to_clean # 传递临时文件列表 ) print_status("消除小面积图斑完成.") # 9. 最终清理和输出结果 print_status("处理流程全部完成.") # 清理在内存或临时位置生成的中间文件 temp_files_processor.clean_up_temp_files(temp_files=temp_files_to_clean, workspace=original_workspace) # 验证最终输出文件是否存在 if arcpy.Exists(final_output_path): print_result(True, final_output_path, "") else: raise Exception(f"处理完成,但最终输出文件 {final_output_path} 不存在。") except FileNotFoundError as fnf_e: error_msg = f"文件不存在错误: {str(fnf_e)}" print_status(error_msg) print_result(False, "", error_msg) except ValueError as ve: error_msg = f"参数错误或数据校验失败: {str(ve)}" print_status(error_msg) print_result(False, "", error_msg) except arcpy.ExecuteError: # 捕获 ArcPy 执行错误 error_msg = f"ArcPy 执行错误: {arcpy.GetMessages(2)}" print_status(error_msg) sys.stderr.write(f"ArcPyExecuteError:{arcpy.GetMessages(2)}\n") # 记录到标准错误 print_result(False, "", error_msg) except Exception as e: # 捕获其他未预料的错误 error_msg = f"发生未预料的错误: {str(e)}\n{traceback.format_exc()}" print_status(error_msg) sys.stderr.write(f"UnexpectedError:{error_msg}\n") # 记录到标准错误 print_result(False, "", error_msg) finally: # 确保在任何情况下都尝试清理(尽管在 except 块中也调用了) # 这里的调用是最后的保障,如果 except 块中的清理失败了 print_status("脚本结束,执行最终清理...") temp_files_processor.clean_up_temp_files(temp_files=temp_files_to_clean, workspace=original_workspace) print_status("最终清理完成.") sys.exit(0) # 正常退出脚本 if __name__ == "__main__": print_status("脚本开始执行...") main()