# -*- coding: utf-8 -*- """ 脚本运行器 (支持多进程) 用于从PyQt6界面调用独立脚本并管理并行执行 整合了多进程功能。 """ import os import sys import json import subprocess import traceback import uuid from PyQt6.QtCore import (QObject, pyqtSignal, QProcess, QProcessEnvironment) from pathlib import Path # 导入Path import functools # 导入 functools 用于偏函数 class ScriptRunner(QObject): """脚本运行器类,用于调用独立脚本处理任务并管理并行执行""" # 定义信号 (包含任务ID) # 将原始的 started, finished, error, log 信号修改为包含任务ID task_started = pyqtSignal(str, str) # 任务ID, 任务描述/消息 task_finished = pyqtSignal(str, bool, str) # 任务ID, 是否成功, 消息 (包含结果或错误) task_error = pyqtSignal(str, str) # 任务ID, 错误信息 task_log = pyqtSignal(str, str) # 任务ID, 日志信息 manager_log = pyqtSignal(str) # Runner 管理器自身的日志 def __init__(self, parent=None, max_concurrent=3): super().__init__(parent) self.max_concurrent = max_concurrent # 控制最大并行进程数 self.running_processes = {} # {task_id: QProcess实例} self.pending_tasks = ([]) # [(task_id, script_path, args_dict, task_description, working_dir), ...] self._next_task_id_counter = 0 # 用于生成简单的任务ID (uuid 更推荐) # 获取项目根目录 self.base_dir = self._get_base_dir() self.manager_log.emit(f"项目根目录: {self.base_dir}") self.manager_log.emit(f"最大并行任务数: {self.max_concurrent}") def _get_base_dir(self): """获取项目根目录""" # 根据你提供的项目结构图调整这里的逻辑 # 假设 script_runner.py 位于 ui/runners 目录下 current_dir = os.path.dirname(os.path.abspath(__file__)) # 向上两级找到项目根目录 (runners <- ui <- project_root) # 调整为向上两级,因为 ui 和 tools 在同一级 return os.path.dirname(os.path.dirname(current_dir)) # 假设 runners 在 ui 目录下 def _find_script(self, script_name): """查找脚本文件""" # 根据你提供的项目结构图调整可能的脚本路径 base_dir = Path(self.base_dir) possible_paths = [ base_dir / "tools" / "core" / script_name, Path(__file__).resolve().parents[2] / "core" / script_name, Path.cwd() / "tools" / "core" / script_name ] for path in possible_paths: path = os.path.normpath(path) if os.path.exists(path): self.manager_log.emit(f"找到脚本: {path}") return path self.manager_log.emit(f"警告: 无法找到脚本 '{script_name}', 尝试过以下路径:") for path in possible_paths: self.manager_log.emit(f" - {path} (存在: {os.path.exists(path)})") # 返回None表示找不到 return None def _get_arcgis_python(self): """获取ArcGIS Pro的Python解释器路径""" # 这里的逻辑与之前相同 try: arcgis_python_paths = [ r"C:\Program Files\ArcGIS\Pro\bin\Python\envs\arcgispro-py3\python.exe", r"D:\ProgramData\ArcGis_Py\.env\arcgispro-py3-clone\python.exe", ] # 检查环境变量 ARCGISPRO_PYTHON 或 CONDA_PREFIX (如果使用Conda环境) if "ARCGISPRO_PYTHON" in os.environ: arcgis_python_paths.insert(0, os.environ["ARCGISPRO_PYTHON"]) for path in arcgis_python_paths: path = os.path.normpath(path) if os.path.exists(path): self.manager_log.emit(f"使用ArcGIS Python解释器: {path}") return path self.manager_log.emit( "警告: 无法自动找到ArcGIS Pro Python解释器,尝试使用当前Python解释器." ) return sys.executable except Exception as e: self.manager_log.emit(f"获取ArcGIS Python路径失败: {str(e)}") return sys.executable # 修正:将添加任务到队列并尝试启动的逻辑命名为 run_script def run_script(self, script_name, args_dict, task_description=None, working_dir=None): """ 将一个脚本任务添加到队列中进行管理和并行执行。 Args: script_name (str): 要执行的脚本文件名。 args_dict (dict): 传递给脚本的参数字典。 task_description (str, optional): 任务的描述。 working_dir (str, optional): 脚本执行的工作目录。 返回: str: 分配给该任务的唯一任务ID 或 None 如果脚本不存在。 """ script_path = self._find_script(script_name) if not script_path or not os.path.exists(script_path): self.manager_log.emit(f"错误: 无法添加任务,脚本 '{script_name}' 不存在.") return None # 生成唯一的任务ID task_id = str(uuid.uuid4().hex[:8]) if task_description is None: task_description = os.path.basename(script_name) # 将任务信息添加到待处理队列 self.pending_tasks.append((task_id, script_path, args_dict, task_description, working_dir)) self.manager_log.emit(f"已添加任务{task_id} - {task_description}到队列...") # 尝试启动下一个任务 self._start_next_task() return task_id # 新增方法:启动下一个可用的任务 def _start_next_task(self): """检查队列和正在运行的进程数量,启动下一个可用的任务""" while len(self.running_processes) < self.max_concurrent and self.pending_tasks: task_id, script_path, args_dict, task_description, working_dir = (self.pending_tasks.pop(0)) self.manager_log.emit(f"正在启动任务: {task_id} - {task_description}") self.task_started.emit(task_id, task_description) # 调用内部方法实际启动单个进程 self._start_single_process(task_id, script_path, args_dict, task_description, working_dir) # 新增方法:实际启动单个 QProcess 进程 def _start_single_process(self, task_id, script_path, args_dict, task_description, working_dir): """ 启动一个 QProcess 进程来执行指定的脚本任务。 这是从原始 run_script 中提取的启动逻辑。 """ python_executable = self._get_arcgis_python() if not python_executable or not os.path.exists(python_executable): error_msg = f"无法找到有效的 ArcGIS Pro Python 解释器: {python_executable or '未找到'}" self.task_error.emit(task_id, error_msg) self._on_process_finished(task_id, None, -1, QProcess.ExitStatus.NormalExit) # 模拟失败完成 return # 构建环境变量 env = QProcessEnvironment.systemEnvironment() env.insert("PATH", os.environ["PATH"]) # 继承系统PATH env.insert("PYTHONPATH", ";".join(sys.path)) # 继承当前Python路径 cmd = [python_executable, "-u", script_path] # 使用 -u 参数禁用缓冲 # 将参数字典转换为命令行参数 for key, value in args_dict.items(): if not isinstance(key, str) or not key: self.task_error.emit(task_id, f"警告: 跳过无效的参数键: {key}") continue param_key = f"--{key}" if isinstance(value, bool): if value: cmd.append(param_key) elif value is not None and value != "NONE": cmd.append(param_key) if isinstance(value, (list, dict)): try: cmd.append(json.dumps(value)) except Exception as e: self.task_error.emit(task_id, f"警告: 无法将参数 '{key}' 序列化为 JSON: {str(e)}") cmd.append(str(value)) else: cmd.append(str(value)) self.task_log.emit(task_id, f"即将执行命令: {subprocess.list2cmdline(cmd)}") proc = QProcess() proc.setProcessEnvironment(env) # 关键设置 # 连接信号到处理函数,使用 functools.partial 传递 task_id 和 process 对象 # 注意:_handle_stdout/_handle_stderr 信号本身不传递 process,需要通过 task_id 从 running_processes 获取 # _on_process_finished 信号传递 exit_code, exit_status,通过 partial 传递 task_id 和 proc proc.readyReadStandardOutput.connect( functools.partial(self._handle_stdout, task_id=task_id) ) proc.readyReadStandardError.connect( functools.partial(self._handle_stderr, task_id=task_id) ) proc.finished.connect( functools.partial(self._on_process_finished, task_id=task_id, proc=proc) ) # 设置工作目录 if working_dir: if os.path.exists(working_dir): proc.setWorkingDirectory(working_dir) self.task_log.emit(task_id, f"工作目录设置为: {working_dir}") else: self.task_error.emit(task_id, f"警告: 工作目录不存在: {working_dir}") # 启动进程 try: proc.start(cmd[0], cmd[1:]) self.running_processes[task_id] = proc # 将进程实例添加到正在运行列表 self.task_log.emit(task_id, f"进程已启动,PID: {proc.processId()}") # 可以将任务描述等信息附加到 QProcess 对象,方便在槽函数中使用 proc.setProperty("task_description", task_description) except Exception as e: error_msg = f"无法启动进程: {str(e)}\n命令: {subprocess.list2cmdline(cmd)}" self.task_error.emit(task_id, error_msg) # 在启动失败时,也需要调用 finished 槽来清理并触发下一个任务 self._on_process_finished(task_id, proc, -1, QProcess.ExitStatus.NormalExit) # 模拟失败完成 # 修改处理槽函数以接受 task_id 并从 running_processes 获取进程 def _handle_stdout(self, task_id): """处理指定任务的标准输出""" proc = self.running_processes.get(task_id) if not proc: return # 如果进程已不在运行列表中,忽略信号 try: data = proc.readAllStandardOutput().data() # 尝试多种解码方式,确保能处理中文 try: text = data.decode("utf-8") except UnicodeDecodeError: try: text = data.decode("gbk") # Windows 常用 except UnicodeDecodeError: text = data.decode("utf-8", errors="replace") # 保底 if text: for line in text.splitlines(): if line.strip(): # 根据行的前缀进一步解析,例如 STATUS: 或 RESULT: if line.startswith("STATUS:"): status_message = line[len("STATUS:") :].strip() self.task_log.emit(task_id, f"状态: {status_message}") elif line.startswith("RESULT:"): self.task_log.emit(task_id, f"原始结果行: {line.strip()}") proc.setProperty("last_result_line", line.strip()) # 将结果行附加到 QProcess 对象 else: self.task_log.emit(task_id, line.strip()) # 其他普通输出 except Exception as e: self.task_error.emit(task_id, f"处理标准输出时出错: {str(e)}") self.task_log.emit(task_id, traceback.format_exc()) # 记录详细错误 def _handle_stderr(self, task_id): """处理指定任务的错误输出""" proc = self.running_processes.get(task_id) if not proc: return # 如果进程已不在运行列表中,忽略信号 try: data = proc.readAllStandardError().data() # 尝试多种解码方式 try: text = data.decode("utf-8") except UnicodeDecodeError: try: text = data.decode("gbk") # Windows 常用 except UnicodeDecodeError: text = data.decode("utf-8", errors="replace") # 保底 if text: for line in text.splitlines(): if line.strip(): # 错误信息直接作为错误日志发送 self.task_error.emit(task_id, f"脚本错误: {line.strip()}") # 可以在这里将错误输出附加到 QProcess 对象,以便在 finished 中汇总 current_stderr = proc.property("accumulated_stderr") or "" proc.setProperty("accumulated_stderr", current_stderr + line.strip() + "\n") except Exception as e: self.task_error.emit(task_id, f"处理错误输出时出错: {str(e)}") self.task_log.emit(task_id, traceback.format_exc()) # 记录详细错误 # 修改处理槽函数以接受 task_id 和 proc 并进行清理和调度 def _on_process_finished(self, exit_code, exit_status, task_id, proc): """处理指定任务的进程完成事件""" self.task_log.emit(task_id, f"进程完成. 退出码: {exit_code}, 退出状态: {exit_status}") # 在处理完成信号时,立即断开输出信号连接 if proc: try: proc.readyReadStandardOutput.disconnect() except TypeError: # 有时信号可能已经断开,捕获 TypeError pass try: proc.readyReadStandardError.disconnect() except TypeError: pass # 也可以断开 finished 信号本身,但通常不需要,因为它只触发一次 # try: # proc.finished.disconnect() # except TypeError: # pass success = False message = "未知错误或脚本未返回明确结果" # 默认消息 # 从运行列表中移除进程 if task_id in self.running_processes: # 确保删除的是正确的进程实例 if self.running_processes[task_id] is proc: del self.running_processes[task_id] else: self.manager_log.emit(f"警告: 任务ID {task_id} 在 running_processes 中对应进程不匹配完成信号的进程实例.") # 这种情况比较异常,可能需要更深入的调试 else: # 如果任务ID不在运行列表中,可能是重复的 finished 信号或逻辑错误 # 或者是在 _start_single_process 中模拟失败完成的情况 if (proc and proc.processId() != 0): # 检查 proc 是否是有效的 QProcess 实例且已启动 self.manager_log.emit(f"警告: 收到未知任务ID {task_id} 的完成信号,但 PID {proc.processId()} 已知。") else: self.manager_log.emit(f"警告: 收到未知任务ID {task_id} 的完成信号.") # 尝试从 QProcess 对象属性中获取 RESULT 行 # 在 _start_single_process 中模拟失败完成时,proc 可能为 None result_line = proc.property("last_result_line") if proc else None accumulated_stderr = proc.property("accumulated_stderr") or "" if proc else "" if result_line: try: # 解析 RESULT 行 parts = result_line[len("RESULT:") :].split("|", 2) # 分割最多两次 if len(parts) >= 3: success_from_script = parts[0] == "True" output_path = parts[1] error_msg_from_script = parts[2] if success_from_script: success = True message = f"成功: {output_path}" else: # 如果脚本返回失败,使用脚本提供的错误信息 success = False message = f"脚本返回失败: {error_msg_from_script}" else: # 如果 RESULT 行格式不正确 success = False message = f"脚本返回结果格式错误: {result_line}" except Exception as e: success = False message = f"解析脚本返回结果时出错: {str(e)}\n原始结果行: {result_line}" else: # 如果没有找到 RESULT 行,根据退出码判断 if exit_status == QProcess.ExitStatus.NormalExit and exit_code == 0: success = True message = "脚本正常退出,但未找到 RESULT 行。" self.task_log.emit(task_id, message) # 记录为日志而不是错误 else: success = False message = f"脚本异常退出 (退出码: {exit_code})." # 如果有累积的错误输出,也添加到消息中 if accumulated_stderr: message += f"\n错误输出:\n{accumulated_stderr.strip()}" self.task_error.emit(task_id, message) # 记录为错误 # 清理 QProcess 实例 (仅当 proc 对象有效时) if proc: proc.close() proc.deleteLater() # 延迟删除对象,确保槽函数执行完毕 # 发射任务完成信号 if len(self.running_processes) == 0: self.task_finished.emit(task_id, success, message) # 尝试启动下一个任务 self._start_next_task() def stop_all_tasks(self): """尝试停止所有正在运行的任务""" self.manager_log.emit("正在尝试停止所有任务...") # 复制字典的键,避免在迭代时修改 tasks_to_stop = list(self.running_processes.keys()) for task_id in tasks_to_stop: proc = self.running_processes.get(task_id) if proc and proc.state() == QProcess.ProcessState.Running: self.task_log.emit(task_id, "尝试终止进程...") proc.terminate() # 尝试优雅终止 (发送SIGTERM) # proc.kill() # 强制杀死进程 (发送SIGKILL) - 更可靠,但可能导致数据损坏 # --- 保留原有 run_... 方法,它们现在会调用 run_script 来添加任务 --- def run_export_map(self, params): script_name = "export_map_v1.py" task_desc = f"导出地图: {params.get('county_name', '未知区县')}" return self.run_script(script_name, params, task_description=task_desc) def run_export_layout(self, params): script_name = "export_layout.py" task_desc = f"导出布局: {os.path.basename(params.get('input_aprx_folder', '未知文件夹'))}" return self.run_script(script_name, params, task_description=task_desc) def run_batch_export_layout(self, params): script_name = "batch_export_layout.py" task_desc = f"批量导出布局: {os.path.basename(params.get('input_aprx_folder', '未知文件夹'))}" return self.run_script(script_name, params, task_description=task_desc) def run_process_raster(self, params): script_name = ("raster_to_polygon.py") task_desc = (f"处理栅格: {os.path.basename(params.get('input_raster', '未知栅格'))}") return self.run_script(script_name, params, task_description=task_desc) def run_area_stat(self, params): script_name = "stats_area_to_excel.py" task_desc = f"统计面积: {os.path.basename(params.get('reclassed_polygon', '未知面要素'))}" return self.run_script(script_name, params, task_description=task_desc) def run_suanhua_stat(self, params): script_name = "stats_sh_to_excel.py" task_desc = f"统计酸化: {os.path.basename(params.get('reclassed_polygon', '未知面要素'))}" return self.run_script(script_name, params, task_description=task_desc) def run_soil_prop_stat(self, params): script_name = "stats_soil_prop_to_excel.py" task_desc = f"统计酸化: {os.path.basename(params.get('reclassed_polygon', '未知面要素'))}" return self.run_script(script_name, params, task_description=task_desc) def run_excel_to_jpg(self, params): script_name = "export_excel_to_jpg_v1.py" task_desc = f"导出Excel: {os.path.basename(params.get('excel_path', '未知文件'))}" return self.run_script(script_name, params, task_description=task_desc) def run_test_script(self, params): script_name = "test_script.py" task_desc = f"测试脚本: {params.get('message', '无消息')}" return self.run_script(script_name, params, task_description=task_desc) # 可以添加其他通用的任务管理方法,例如: def get_pending_tasks(self): """获取当前待处理任务列表""" # 返回任务ID和描述的列表 return [(tid, desc) for tid, sp, args, desc, wd in self.pending_tasks] def get_running_tasks(self): """获取当前正在运行任务的 task_id 和描述的列表""" # 需要从 QProcess 对象中获取任务描述 (如果在启动时设置了属性) running_list = [] for tid, proc in self.running_processes.items(): desc = proc.property("task_description") or "未知任务" running_list.append((tid, desc)) return running_list def get_max_concurrent(self): """获取最大并行任务数设置""" return self.max_concurrent def set_max_concurrent(self, count): """设置最大并行任务数,并尝试启动更多任务""" if count > 0: self.max_concurrent = count self.manager_log.emit(f"最大并行任务数已更改为: {self.max_concurrent}") self._start_next_task() # 尝试启动更多任务 else: self.manager_log.emit("警告: 最大并行任务数必须大于 0。")