#!/usr/bin/env python # -*- coding: utf-8 -*- """ 脚本运行器 用于从PyQt6界面调用独立脚本 """ import os import sys import json import subprocess import traceback from PyQt6.QtCore import QObject, pyqtSignal, QProcess, QThread, QRunnable, QThreadPool class LambdaTask(QRunnable): """ 用于在QThreadPool中执行任意函数的任务类 """ def __init__(self, fn, *args, **kwargs): super().__init__() self.fn = fn self.args = args self.kwargs = kwargs def run(self): """执行函数""" try: self.fn(*self.args, **self.kwargs) except Exception as e: print(f"任务执行错误: {str(e)}") traceback.print_exc() class ScriptRunner(QObject): """脚本运行器类,用于调用独立脚本处理任务""" # 定义信号 started = pyqtSignal(str) finished = pyqtSignal(object) error = pyqtSignal(str) log = pyqtSignal(str) def __init__(self, parent=None): super().__init__(parent) # self.parent_ref = weakref.ref(parent) if parent else None # 获取项目根目录 self.base_dir = self._get_base_dir() self.log.emit(f"项目根目录: {self.base_dir}") def _get_base_dir(self): """获取项目根目录""" # 当前文件的目录 current_dir = os.path.dirname(os.path.abspath(__file__)) # 向上两级找到项目根目录 (ui -> tools -> project_root) return os.path.dirname(os.path.dirname(current_dir)) def _find_script(self, script_name): """查找脚本文件""" # 可能的脚本路径 possible_paths = [ os.path.join(self.base_dir, 'tools', 'core', script_name), # 从项目根目录查找 os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'core', script_name), # 从tools目录查找 os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'core', script_name)), # 相对于ui目录 os.path.join(os.getcwd(), 'tools', 'core', script_name) # 从当前工作目录查找 ] # 尝试每个可能的路径 for path in possible_paths: path = os.path.normpath(path) # 规范化路径 if os.path.exists(path): self.log.emit(f"找到脚本: {path}") return path # 如果找不到脚本,记录所有尝试的路径 self.log.emit("无法找到脚本,尝试过以下路径:") for path in possible_paths: path = os.path.normpath(path) self.log.emit(f" - {path} (存在: {os.path.exists(path)})") # 返回第一个路径(即使它不存在) return possible_paths[0] def run_script_external(self, script_path, args, working_dir=None): """ 使用外部Python窗口运行脚本,显示实时输出 Args: script_path: 脚本路径 args: 命令行参数字典 working_dir: 工作目录 Returns: 返回启动是否成功 """ try: # 更详细的日志 self.log.emit(f"尝试在外部窗口运行脚本: {script_path}") # 检查脚本是否存在 if not os.path.exists(script_path): self.error.emit(f"脚本不存在: {script_path}") self.log.emit(f"当前工作目录: {os.getcwd()}") self.log.emit(f"尝试查找脚本...") # 从脚本名尝试查找脚本 script_name = os.path.basename(script_path) actual_path = self._find_script(script_name) if not os.path.exists(actual_path): self.error.emit(f"无法找到脚本: {script_name}") return False script_path = actual_path self.log.emit(f"使用脚本路径: {script_path}") # 获取ArcGIS Pro的Python解释器路径 python_exe = self._get_arcgis_python() or sys.executable self.log.emit(f"使用Python解释器: {python_exe}") # 构建命令行参数 cmd = [python_exe, script_path] # 添加参数 for key, value in args.items(): if key.startswith('--'): param_key = key else: param_key = f"--{key}" if isinstance(value, bool): # 布尔参数 if value: cmd.append(param_key) elif isinstance(value, list): # 列表参数,通过JSON序列化传递 cmd.append(param_key) cmd.append(json.dumps(value)) elif value is not None: # 其他参数 cmd.append(param_key) cmd.append(str(value)) # 通知开始 self.started.emit(f"开始执行脚本: {os.path.basename(script_path)}") # 创建临时批处理文件,为了能自动关闭窗口 batch_file_path = os.path.join(os.environ.get('TEMP', '.'), f"run_script_{os.getpid()}.bat") # 构建批处理文件内容 batch_content = f"""@echo off chcp 65001 > nul echo 正在执行脚本: {os.path.basename(script_path)}... echo 命令: {' '.join(cmd)} echo. """ # 设置工作目录 if working_dir: batch_content += f'cd /d "{working_dir}"\n' # 添加执行命令 batch_content += f'"{python_exe}" "{script_path}"' # 添加命令行参数 for i in range(2, len(cmd)): # 检查参数是否需要引号 param = cmd[i] if ' ' in param or ',' in param or ';' in param or '&' in param or '[' in param or ']' in param: batch_content += f' "{param}"' else: batch_content += f' {param}' # 添加执行完成后的提示和暂停 batch_content += "\necho.\necho 脚本执行完成,窗口将在3秒后自动关闭...\ntimeout /t 3 >nul\n" # 写入批处理文件 with open(batch_file_path, 'w', encoding='utf-8') as f: f.write(batch_content) self.log.emit(f"创建批处理文件: {batch_file_path}") # 使用subprocess.Popen启动外部窗口 # 使用start命令在新窗口中运行批处理文件 start_cmd = f'start "执行脚本 - {os.path.basename(script_path)}" "{batch_file_path}"' subprocess.Popen(start_cmd, shell=True) # 模拟成功启动 self.log.emit("已在外部窗口启动脚本") # 注意:由于是在外部窗口运行,我们无法获知脚本的实际结果 # 假设脚本已成功启动,但实际执行结果需要用户观察外部窗口 QThread.msleep(1000) # 等待1秒 self.finished.emit(True) # 预定批处理文件的删除 def delete_batch_file(): try: if os.path.exists(batch_file_path): os.remove(batch_file_path) self.log.emit(f"已删除临时批处理文件: {batch_file_path}") except Exception as e: self.log.emit(f"删除临时批处理文件失败: {str(e)}") # 创建延迟删除任务 QThread.sleep(10) # 确保批处理文件有足够时间执行 delete_task = LambdaTask(delete_batch_file) thread_pool = QThreadPool.globalInstance() if thread_pool is not None: thread_pool.start(delete_task) return True except Exception as e: self.error.emit(f"启动外部脚本时出错: {str(e)}") self.log.emit(traceback.format_exc()) return False def run_script(self, script_path, args, working_dir=None): """使用非阻塞方式运行脚本""" try: # 检查脚本是否存在 if not os.path.exists(script_path): self.error.emit(f"脚本不存在: {script_path}") self.log.emit(f"当前工作目录: {os.getcwd()}") self.log.emit(f"尝试查找脚本...") # 从脚本名尝试查找脚本 script_name = os.path.basename(script_path) actual_path = self._find_script(script_name) if not os.path.exists(actual_path): self.error.emit(f"无法找到脚本: {script_name}") return False script_path = actual_path # 获取ArcGIS Pro的Python解释器路径 python_exe = self._get_arcgis_python() or sys.executable self.log.emit(f"使用Python解释器: {python_exe}") # 构建命令行参数 cmd = [python_exe, "-u", script_path] # 添加参数 for key, value in args.items(): if key.startswith('--'): param_key = key else: param_key = f"--{key}" if isinstance(value, bool): # 布尔参数 if value: cmd.append(param_key) elif value is not None: # 其他参数 cmd.append(param_key) cmd.append(str(value)) # 创建QProcess而不是subprocess.Popen process = QProcess() # 连接QProcess的信号 process.readyReadStandardOutput.connect( lambda: self._handle_stdout(process) ) process.readyReadStandardError.connect( lambda: self._handle_stderr(process) ) process.finished.connect( lambda exit_code, exit_status: self._handle_finished( exit_code, exit_status, process ) ) # 设置工作目录 if working_dir: process.setWorkingDirectory(working_dir) # 通知开始 self.started.emit(f"开始执行脚本: {os.path.basename(script_path)}") # 非阻塞启动进程 process.start(cmd[0], cmd[1:]) # 将进程对象保存在类变量中,防止被垃圾回收 self.current_process = process # 返回True表示进程已启动(结果将通过信号异步通知) return True except Exception as e: self.error.emit(f"执行脚本时出错: {str(e)}") self.log.emit(traceback.format_exc()) return False def _handle_stdout(self, process): """处理标准输出""" try: # 首先尝试使用utf-8解码 data = process.readAllStandardOutput().data() try: text = data.decode('utf-8') except UnicodeDecodeError: # utf-8解码失败,尝试使用系统默认编码或GBK(中文Windows常用) try: text = data.decode('gbk') except UnicodeDecodeError: # 如果还是失败,使用errors='replace'选项替换无法解码的字符 text = data.decode('utf-8', errors='replace') if text: for line in text.splitlines(): if line.strip(): self.log.emit(line.strip()) except Exception as e: self.log.emit(f"处理标准输出时出错: {str(e)}") def _handle_stderr(self, process): """处理错误输出""" try: # 首先尝试使用utf-8解码 data = process.readAllStandardError().data() try: text = data.decode('utf-8') except UnicodeDecodeError: # utf-8解码失败,尝试使用系统默认编码或GBK(中文Windows常用) try: text = data.decode('gbk') except UnicodeDecodeError: # 如果还是失败,使用errors='replace'选项替换无法解码的字符 text = data.decode('utf-8', errors='replace') if text: for line in text.splitlines(): if line.strip(): self.log.emit(f"错误: {line.strip()}") except Exception as e: self.log.emit(f"处理错误输出时出错: {str(e)}") def _handle_finished(self, exit_code, exit_status, process): """处理进程结束""" if exit_code == 0: self.log.emit(f"脚本执行成功,返回代码: {exit_code}") self.finished.emit(True) else: self.error.emit(f"脚本执行失败,返回代码: {exit_code}") self.finished.emit(False) # 清理进程 process.close() if hasattr(self, 'current_process') and self.current_process == process: self.current_process = None def _get_arcgis_python(self): """获取ArcGIS Pro的Python解释器路径""" try: # 常见的ArcGIS Pro Python路径 arcgis_python_paths = [ r"C:\Program Files\ArcGIS\Pro\bin\Python\envs\arcgispro-py3\python.exe", r"C:\Program Files\ArcGIS\Pro\bin\Python\python.exe" ] # 检查环境变量 if 'ARCGISPRO_PYTHON' in os.environ: arcgis_python_paths.insert(0, os.environ['ARCGISPRO_PYTHON']) # 尝试每个可能的路径 for path in arcgis_python_paths: if os.path.exists(path): return path return None # 如果找不到,返回None except Exception as e: self.log.emit(f"获取ArcGIS Python路径失败: {str(e)}") return None def run_export_map(self, params): """ 运行导出地图脚本 Args: params: 参数字典,包含: - config_file: 配置文件路径 - county_name: 区县名称 - polygon_list: 要导出的图层列表 - template_aprx_file: 模板文件路径 - output_path: 输出路径 - data_source_path: 数据源路径 - symbol_path: 符号文件路径 - force_regenerate: 是否强制重新生成 Returns: 返回脚本执行结果 """ try: # 检查必要参数是否存在 required_params = ['config_file', 'county_name', 'polygon_list', 'template_aprx_file', 'output_path', 'data_source_path', 'symbol_path'] for param in required_params: if param not in params: self.error.emit(f"缺少必要参数: {param}") return False # 查找脚本路径 script_path = self._find_script('export_map.py') # 构建命令行参数 args = { 'config_file': params['config_file'], 'county_name': params['county_name'], 'polygon_list': json.dumps(params['polygon_list']), # 将图层列表序列化为JSON字符串 'template_aprx_file': params['template_aprx_file'], 'output_path': params['output_path'], 'data_source_path': params['data_source_path'], 'symbol_path': params['symbol_path'] } # 添加可选参数 if 'force_regenerate' in params and params['force_regenerate']: args['force_regenerate'] = True # 使用外部窗口执行脚本 return self.run_script(script_path, args) except Exception as e: self.error.emit(f"执行导出地图脚本时出错: {str(e)}") self.log.emit(traceback.format_exc()) return False def run_batch_export_map(self, params): """运行批量导出地图脚本 """ try: # 获取参数 if 'export_config' not in params: self.error.emit(f"缺少必要参数: export_config") return False export_config = params['export_config'] county_name = params['county_name'] template_aprx_file = params['template_aprx_file'] output_folder = params['output_path'] data_source_path = params['data_source_path'] symbol_path = params['symbol_path'] polygon_list = params.get('polygon_list', []) # 如果指定了图层列表,则只导出这些图层 force_regenerate = params.get('force_regenerate', False) # 是否强制重新生成工程文件 # 如果没有指定图层列表,则使用配置文件中的所有图层 if not polygon_list: polygon_list = list(export_config.keys()) self.log.emit(f"开始批量导出 {len(polygon_list)} 个图层") # 准备脚本路径 script_path = self._find_script('export_map.py') # 记录成功和失败的图层 success_count = 0 failed_count = 0 # 循环处理每个图层 for layer_name in polygon_list: try: self.log.emit(f"\n===== 开始处理图层: {layer_name} =====") # 构建命令行参数 args = { 'config': params['config_file'], 'county': county_name, 'layer': layer_name, 'template': template_aprx_file, 'output': output_folder, 'data_source': data_source_path, 'symbol': symbol_path, 'force': force_regenerate } # 执行脚本 if self.run_script(script_path, args): success_count += 1 else: failed_count += 1 except Exception as e: self.log.emit(f"处理图层 {layer_name} 时出错: {str(e)}") failed_count += 1 # 通知完成 self.log.emit(f"\n===== 批量处理完成 =====") self.log.emit(f"总计图层: {len(polygon_list)}") self.log.emit(f"成功: {success_count} 个") self.log.emit(f"失败: {failed_count} 个") result = { 'total': len(polygon_list), 'success': success_count, 'failed': failed_count } self.finished.emit(result) return result except Exception as e: self.error.emit(f"执行批量导出地图脚本时出错: {str(e)}") self.log.emit(traceback.format_exc()) return False def run_export_layout(self, params): """ 运行导出布局脚本 Args: params: 参数字典 Returns: 返回脚本执行结果 """ try: # 准备参数 script_path = self._find_script('export_layout.py') # 构建命令行参数 args = { 'mode': 'single', 'aprx': params['aprx_path'], 'output': params['output_path'], 'format': params.get('export_format', 'PDF'), 'resolution': params.get('resolution', 300), 'name': params.get('output_name', '') } # 执行脚本 # return self.run_script(script_path, args) except Exception as e: self.error.emit(f"执行导出布局脚本时出错: {str(e)}") self.log.emit(traceback.format_exc()) return False def run_batch_export_layout(self, params): """ 运行批量导出布局脚本 Args: params: 参数字典 Returns: 返回脚本执行结果 """ try: # 准备参数 script_path = self._find_script('export_layout.py') # 构建命令行参数 args = { 'aprx_file_list':json.dumps(params['aprx_file_list']), 'input_aprx_folder': params['input_aprx_folder'], 'output': params['output_image_path'], 'format': params.get('export_format', 'PDF'), 'resolution': params.get('resolution', 300), 'use_multiprocessing': params.get('use_multiprocessing', False), 'process_count': params.get('process_count', 2) } # 执行脚本 return self.run_script(script_path, args) except Exception as e: self.error.emit(f"执行批量导出布局脚本时出错: {str(e)}") self.log.emit(traceback.format_exc()) return False def run_process_raster(self, params): """ 运行栅格处理脚本 Args: params: 参数字典 Returns: 返回脚本执行结果 """ try: # 准备参数 script_path = self._find_script('raster_to_vector.py') # 构建命令行参数 args = { 'command': 'process', 'input': params['input_raster'], 'output': params['output_raster'], 'format': params.get('output_format', 'TIFF'), 'compression': params.get('compression', 'LZW') } # 执行脚本 return self.run_script(script_path, args) except Exception as e: self.error.emit(f"执行栅格处理脚本时出错: {str(e)}") self.log.emit(traceback.format_exc()) return False def run_raster_batch_process(self, params): """ 运行批量栅格处理脚本 Args: params: 参数字典 Returns: 返回脚本执行结果 """ try: # 准备参数 script_path = self._find_script('raster_handler.py') params["selected_files"] = json.dumps(params.get("selected_files")) # 执行脚本 return self.run_script(script_path, params) except Exception as e: self.error.emit(f"执行批量栅格处理脚本时出错: {str(e)}") self.log.emit(traceback.format_exc()) return False def run_test_script(self, params): try: # 检查必要参数是否存在 required_params = ['message', 'count'] for param in required_params: if param not in params: self.error.emit(f"缺少必要参数: {param}") return False # 查找脚本路径 script_path = self._find_script('test_script.py') # 构建命令行参数 args = { 'message': params['message'], 'count': params['count'] } # 执行脚本 return self.run_script(script_path, args) except Exception as e: self.error.emit(f"执行导出地图脚本时出错: {str(e)}") self.log.emit(traceback.format_exc()) return False