Files
ArcGis_Py/tools/ui/runners/script_runner.txt
2026-04-22 12:27:49 +08:00

655 lines
25 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
脚本运行器
用于从PyQt6界面调用独立脚本
"""
import os
import sys
import json
import subprocess
import traceback
from PyQt6.QtCore import QObject, pyqtSignal, QProcess, QThread, QRunnable, QThreadPool
class LambdaTask(QRunnable):
"""
用于在QThreadPool中执行任意函数的任务类
"""
def __init__(self, fn, *args, **kwargs):
super().__init__()
self.fn = fn
self.args = args
self.kwargs = kwargs
def run(self):
"""执行函数"""
try:
self.fn(*self.args, **self.kwargs)
except Exception as e:
print(f"任务执行错误: {str(e)}")
traceback.print_exc()
class ScriptRunner(QObject):
"""脚本运行器类,用于调用独立脚本处理任务"""
# 定义信号
started = pyqtSignal(str)
finished = pyqtSignal(object)
error = pyqtSignal(str)
log = pyqtSignal(str)
def __init__(self, parent=None):
super().__init__(parent)
# self.parent_ref = weakref.ref(parent) if parent else None
# 获取项目根目录
self.base_dir = self._get_base_dir()
self.log.emit(f"项目根目录: {self.base_dir}")
def _get_base_dir(self):
"""获取项目根目录"""
# 当前文件的目录
current_dir = os.path.dirname(os.path.abspath(__file__))
# 向上两级找到项目根目录 (ui -> tools -> project_root)
return os.path.dirname(os.path.dirname(current_dir))
def _find_script(self, script_name):
"""查找脚本文件"""
# 可能的脚本路径
possible_paths = [
os.path.join(self.base_dir, 'tools', 'core', script_name), # 从项目根目录查找
os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'core', script_name), # 从tools目录查找
os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'core', script_name)), # 相对于ui目录
os.path.join(os.getcwd(), 'tools', 'core', script_name) # 从当前工作目录查找
]
# 尝试每个可能的路径
for path in possible_paths:
path = os.path.normpath(path) # 规范化路径
if os.path.exists(path):
self.log.emit(f"找到脚本: {path}")
return path
# 如果找不到脚本,记录所有尝试的路径
self.log.emit("无法找到脚本,尝试过以下路径:")
for path in possible_paths:
path = os.path.normpath(path)
self.log.emit(f" - {path} (存在: {os.path.exists(path)})")
# 返回第一个路径(即使它不存在)
return possible_paths[0]
def run_script_external(self, script_path, args, working_dir=None):
"""
使用外部Python窗口运行脚本显示实时输出
Args:
script_path: 脚本路径
args: 命令行参数字典
working_dir: 工作目录
Returns:
返回启动是否成功
"""
try:
# 更详细的日志
self.log.emit(f"尝试在外部窗口运行脚本: {script_path}")
# 检查脚本是否存在
if not os.path.exists(script_path):
self.error.emit(f"脚本不存在: {script_path}")
self.log.emit(f"当前工作目录: {os.getcwd()}")
self.log.emit(f"尝试查找脚本...")
# 从脚本名尝试查找脚本
script_name = os.path.basename(script_path)
actual_path = self._find_script(script_name)
if not os.path.exists(actual_path):
self.error.emit(f"无法找到脚本: {script_name}")
return False
script_path = actual_path
self.log.emit(f"使用脚本路径: {script_path}")
# 获取ArcGIS Pro的Python解释器路径
python_exe = self._get_arcgis_python() or sys.executable
self.log.emit(f"使用Python解释器: {python_exe}")
# 构建命令行参数
cmd = [python_exe, script_path]
# 添加参数
for key, value in args.items():
if key.startswith('--'):
param_key = key
else:
param_key = f"--{key}"
if isinstance(value, bool):
# 布尔参数
if value:
cmd.append(param_key)
elif isinstance(value, list):
# 列表参数通过JSON序列化传递
cmd.append(param_key)
cmd.append(json.dumps(value))
elif value is not None:
# 其他参数
cmd.append(param_key)
cmd.append(str(value))
# 通知开始
self.started.emit(f"开始执行脚本: {os.path.basename(script_path)}")
# 创建临时批处理文件,为了能自动关闭窗口
batch_file_path = os.path.join(os.environ.get('TEMP', '.'), f"run_script_{os.getpid()}.bat")
# 构建批处理文件内容
batch_content = f"""@echo off
chcp 65001 > nul
echo 正在执行脚本: {os.path.basename(script_path)}...
echo 命令: {' '.join(cmd)}
echo.
"""
# 设置工作目录
if working_dir:
batch_content += f'cd /d "{working_dir}"\n'
# 添加执行命令
batch_content += f'"{python_exe}" "{script_path}"'
# 添加命令行参数
for i in range(2, len(cmd)):
# 检查参数是否需要引号
param = cmd[i]
if ' ' in param or ',' in param or ';' in param or '&' in param or '[' in param or ']' in param:
batch_content += f' "{param}"'
else:
batch_content += f' {param}'
# 添加执行完成后的提示和暂停
batch_content += "\necho.\necho 脚本执行完成窗口将在3秒后自动关闭...\ntimeout /t 3 >nul\n"
# 写入批处理文件
with open(batch_file_path, 'w', encoding='utf-8') as f:
f.write(batch_content)
self.log.emit(f"创建批处理文件: {batch_file_path}")
# 使用subprocess.Popen启动外部窗口
# 使用start命令在新窗口中运行批处理文件
start_cmd = f'start "执行脚本 - {os.path.basename(script_path)}" "{batch_file_path}"'
subprocess.Popen(start_cmd, shell=True)
# 模拟成功启动
self.log.emit("已在外部窗口启动脚本")
# 注意:由于是在外部窗口运行,我们无法获知脚本的实际结果
# 假设脚本已成功启动,但实际执行结果需要用户观察外部窗口
QThread.msleep(1000) # 等待1秒
self.finished.emit(True)
# 预定批处理文件的删除
def delete_batch_file():
try:
if os.path.exists(batch_file_path):
os.remove(batch_file_path)
self.log.emit(f"已删除临时批处理文件: {batch_file_path}")
except Exception as e:
self.log.emit(f"删除临时批处理文件失败: {str(e)}")
# 创建延迟删除任务
QThread.sleep(10) # 确保批处理文件有足够时间执行
delete_task = LambdaTask(delete_batch_file)
thread_pool = QThreadPool.globalInstance()
if thread_pool is not None:
thread_pool.start(delete_task)
return True
except Exception as e:
self.error.emit(f"启动外部脚本时出错: {str(e)}")
self.log.emit(traceback.format_exc())
return False
def run_script(self, script_path, args, working_dir=None):
"""使用非阻塞方式运行脚本"""
try:
# 检查脚本是否存在
if not os.path.exists(script_path):
self.error.emit(f"脚本不存在: {script_path}")
self.log.emit(f"当前工作目录: {os.getcwd()}")
self.log.emit(f"尝试查找脚本...")
# 从脚本名尝试查找脚本
script_name = os.path.basename(script_path)
actual_path = self._find_script(script_name)
if not os.path.exists(actual_path):
self.error.emit(f"无法找到脚本: {script_name}")
return False
script_path = actual_path
# 获取ArcGIS Pro的Python解释器路径
python_exe = self._get_arcgis_python() or sys.executable
self.log.emit(f"使用Python解释器: {python_exe}")
# 构建命令行参数
cmd = [python_exe, "-u", script_path]
# 添加参数
for key, value in args.items():
if key.startswith('--'):
param_key = key
else:
param_key = f"--{key}"
if isinstance(value, bool):
# 布尔参数
if value:
cmd.append(param_key)
elif value is not None:
# 其他参数
cmd.append(param_key)
cmd.append(str(value))
# 创建QProcess而不是subprocess.Popen
process = QProcess()
# 连接QProcess的信号
process.readyReadStandardOutput.connect(
lambda: self._handle_stdout(process)
)
process.readyReadStandardError.connect(
lambda: self._handle_stderr(process)
)
process.finished.connect(
lambda exit_code, exit_status: self._handle_finished(
exit_code, exit_status, process
)
)
# 设置工作目录
if working_dir:
process.setWorkingDirectory(working_dir)
# 通知开始
self.started.emit(f"开始执行脚本: {os.path.basename(script_path)}")
# 非阻塞启动进程
process.start(cmd[0], cmd[1:])
# 将进程对象保存在类变量中,防止被垃圾回收
self.current_process = process
# 返回True表示进程已启动结果将通过信号异步通知
return True
except Exception as e:
self.error.emit(f"执行脚本时出错: {str(e)}")
self.log.emit(traceback.format_exc())
return False
def _handle_stdout(self, process):
"""处理标准输出"""
try:
# 首先尝试使用utf-8解码
data = process.readAllStandardOutput().data()
try:
text = data.decode('utf-8')
except UnicodeDecodeError:
# utf-8解码失败尝试使用系统默认编码或GBK中文Windows常用
try:
text = data.decode('gbk')
except UnicodeDecodeError:
# 如果还是失败使用errors='replace'选项替换无法解码的字符
text = data.decode('utf-8', errors='replace')
if text:
for line in text.splitlines():
if line.strip():
self.log.emit(line.strip())
except Exception as e:
self.log.emit(f"处理标准输出时出错: {str(e)}")
def _handle_stderr(self, process):
"""处理错误输出"""
try:
# 首先尝试使用utf-8解码
data = process.readAllStandardError().data()
try:
text = data.decode('utf-8')
except UnicodeDecodeError:
# utf-8解码失败尝试使用系统默认编码或GBK中文Windows常用
try:
text = data.decode('gbk')
except UnicodeDecodeError:
# 如果还是失败使用errors='replace'选项替换无法解码的字符
text = data.decode('utf-8', errors='replace')
if text:
for line in text.splitlines():
if line.strip():
self.log.emit(f"错误: {line.strip()}")
except Exception as e:
self.log.emit(f"处理错误输出时出错: {str(e)}")
def _handle_finished(self, exit_code, exit_status, process):
"""处理进程结束"""
if exit_code == 0:
self.log.emit(f"脚本执行成功,返回代码: {exit_code}")
self.finished.emit(True)
else:
self.error.emit(f"脚本执行失败,返回代码: {exit_code}")
self.finished.emit(False)
# 清理进程
process.close()
if hasattr(self, 'current_process') and self.current_process == process:
self.current_process = None
def _get_arcgis_python(self):
"""获取ArcGIS Pro的Python解释器路径"""
try:
# 常见的ArcGIS Pro Python路径
arcgis_python_paths = [
r"C:\Program Files\ArcGIS\Pro\bin\Python\envs\arcgispro-py3\python.exe",
r"C:\Program Files\ArcGIS\Pro\bin\Python\python.exe"
]
# 检查环境变量
if 'ARCGISPRO_PYTHON' in os.environ:
arcgis_python_paths.insert(0, os.environ['ARCGISPRO_PYTHON'])
# 尝试每个可能的路径
for path in arcgis_python_paths:
if os.path.exists(path):
return path
return None # 如果找不到返回None
except Exception as e:
self.log.emit(f"获取ArcGIS Python路径失败: {str(e)}")
return None
def run_export_map(self, params):
"""
运行导出地图脚本
Args:
params: 参数字典,包含:
- config_file: 配置文件路径
- county_name: 区县名称
- polygon_list: 要导出的图层列表
- template_aprx_file: 模板文件路径
- output_path: 输出路径
- data_source_path: 数据源路径
- symbol_path: 符号文件路径
- force_regenerate: 是否强制重新生成
Returns:
返回脚本执行结果
"""
try:
# 检查必要参数是否存在
required_params = ['config_file', 'county_name', 'polygon_list',
'template_aprx_file', 'output_path',
'data_source_path', 'symbol_path']
for param in required_params:
if param not in params:
self.error.emit(f"缺少必要参数: {param}")
return False
# 查找脚本路径
script_path = self._find_script('export_map.py')
# 构建命令行参数
args = {
'config_file': params['config_file'],
'county_name': params['county_name'],
'polygon_list': json.dumps(params['polygon_list']), # 将图层列表序列化为JSON字符串
'template_aprx_file': params['template_aprx_file'],
'output_path': params['output_path'],
'data_source_path': params['data_source_path'],
'symbol_path': params['symbol_path']
}
# 添加可选参数
if 'force_regenerate' in params and params['force_regenerate']:
args['force_regenerate'] = True
# 使用外部窗口执行脚本
return self.run_script(script_path, args)
except Exception as e:
self.error.emit(f"执行导出地图脚本时出错: {str(e)}")
self.log.emit(traceback.format_exc())
return False
def run_batch_export_map(self, params):
"""运行批量导出地图脚本 """
try:
# 获取参数
if 'export_config' not in params:
self.error.emit(f"缺少必要参数: export_config")
return False
export_config = params['export_config']
county_name = params['county_name']
template_aprx_file = params['template_aprx_file']
output_folder = params['output_path']
data_source_path = params['data_source_path']
symbol_path = params['symbol_path']
polygon_list = params.get('polygon_list', []) # 如果指定了图层列表,则只导出这些图层
force_regenerate = params.get('force_regenerate', False) # 是否强制重新生成工程文件
# 如果没有指定图层列表,则使用配置文件中的所有图层
if not polygon_list:
polygon_list = list(export_config.keys())
self.log.emit(f"开始批量导出 {len(polygon_list)} 个图层")
# 准备脚本路径
script_path = self._find_script('export_map.py')
# 记录成功和失败的图层
success_count = 0
failed_count = 0
# 循环处理每个图层
for layer_name in polygon_list:
try:
self.log.emit(f"\n===== 开始处理图层: {layer_name} =====")
# 构建命令行参数
args = {
'config': params['config_file'],
'county': county_name,
'layer': layer_name,
'template': template_aprx_file,
'output': output_folder,
'data_source': data_source_path,
'symbol': symbol_path,
'force': force_regenerate
}
# 执行脚本
if self.run_script(script_path, args):
success_count += 1
else:
failed_count += 1
except Exception as e:
self.log.emit(f"处理图层 {layer_name} 时出错: {str(e)}")
failed_count += 1
# 通知完成
self.log.emit(f"\n===== 批量处理完成 =====")
self.log.emit(f"总计图层: {len(polygon_list)}")
self.log.emit(f"成功: {success_count}")
self.log.emit(f"失败: {failed_count}")
result = {
'total': len(polygon_list),
'success': success_count,
'failed': failed_count
}
self.finished.emit(result)
return result
except Exception as e:
self.error.emit(f"执行批量导出地图脚本时出错: {str(e)}")
self.log.emit(traceback.format_exc())
return False
def run_export_layout(self, params):
"""
运行导出布局脚本
Args:
params: 参数字典
Returns:
返回脚本执行结果
"""
try:
# 准备参数
script_path = self._find_script('export_layout.py')
# 构建命令行参数
args = {
'mode': 'single',
'aprx': params['aprx_path'],
'output': params['output_path'],
'format': params.get('export_format', 'PDF'),
'resolution': params.get('resolution', 300),
'name': params.get('output_name', '')
}
# 执行脚本
# return self.run_script(script_path, args)
except Exception as e:
self.error.emit(f"执行导出布局脚本时出错: {str(e)}")
self.log.emit(traceback.format_exc())
return False
def run_batch_export_layout(self, params):
"""
运行批量导出布局脚本
Args:
params: 参数字典
Returns:
返回脚本执行结果
"""
try:
# 准备参数
script_path = self._find_script('export_layout.py')
# 构建命令行参数
args = {
'aprx_file_list':json.dumps(params['aprx_file_list']),
'input_aprx_folder': params['input_aprx_folder'],
'output': params['output_image_path'],
'format': params.get('export_format', 'PDF'),
'resolution': params.get('resolution', 300),
'use_multiprocessing': params.get('use_multiprocessing', False),
'process_count': params.get('process_count', 2)
}
# 执行脚本
return self.run_script(script_path, args)
except Exception as e:
self.error.emit(f"执行批量导出布局脚本时出错: {str(e)}")
self.log.emit(traceback.format_exc())
return False
def run_process_raster(self, params):
"""
运行栅格处理脚本
Args:
params: 参数字典
Returns:
返回脚本执行结果
"""
try:
# 准备参数
script_path = self._find_script('raster_to_vector.py')
# 构建命令行参数
args = {
'command': 'process',
'input': params['input_raster'],
'output': params['output_raster'],
'format': params.get('output_format', 'TIFF'),
'compression': params.get('compression', 'LZW')
}
# 执行脚本
return self.run_script(script_path, args)
except Exception as e:
self.error.emit(f"执行栅格处理脚本时出错: {str(e)}")
self.log.emit(traceback.format_exc())
return False
def run_raster_batch_process(self, params):
"""
运行批量栅格处理脚本
Args:
params: 参数字典
Returns:
返回脚本执行结果
"""
try:
# 准备参数
script_path = self._find_script('raster_handler.py')
params["selected_files"] = json.dumps(params.get("selected_files"))
# 执行脚本
return self.run_script(script_path, params)
except Exception as e:
self.error.emit(f"执行批量栅格处理脚本时出错: {str(e)}")
self.log.emit(traceback.format_exc())
return False
def run_test_script(self, params):
try:
# 检查必要参数是否存在
required_params = ['message', 'count']
for param in required_params:
if param not in params:
self.error.emit(f"缺少必要参数: {param}")
return False
# 查找脚本路径
script_path = self._find_script('test_script.py')
# 构建命令行参数
args = {
'message': params['message'],
'count': params['count']
}
# 执行脚本
return self.run_script(script_path, args)
except Exception as e:
self.error.emit(f"执行导出地图脚本时出错: {str(e)}")
self.log.emit(traceback.format_exc())
return False