初始化

This commit is contained in:
2026-04-22 12:27:49 +08:00
commit 4857cb6e45
73 changed files with 20927 additions and 0 deletions

View File

@@ -0,0 +1,655 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
脚本运行器
用于从PyQt6界面调用独立脚本
"""
import os
import sys
import json
import subprocess
import traceback
from PyQt6.QtCore import QObject, pyqtSignal, QProcess, QThread, QRunnable, QThreadPool
class LambdaTask(QRunnable):
"""
用于在QThreadPool中执行任意函数的任务类
"""
def __init__(self, fn, *args, **kwargs):
super().__init__()
self.fn = fn
self.args = args
self.kwargs = kwargs
def run(self):
"""执行函数"""
try:
self.fn(*self.args, **self.kwargs)
except Exception as e:
print(f"任务执行错误: {str(e)}")
traceback.print_exc()
class ScriptRunner(QObject):
"""脚本运行器类,用于调用独立脚本处理任务"""
# 定义信号
started = pyqtSignal(str)
finished = pyqtSignal(object)
error = pyqtSignal(str)
log = pyqtSignal(str)
def __init__(self, parent=None):
super().__init__(parent)
# self.parent_ref = weakref.ref(parent) if parent else None
# 获取项目根目录
self.base_dir = self._get_base_dir()
self.log.emit(f"项目根目录: {self.base_dir}")
def _get_base_dir(self):
"""获取项目根目录"""
# 当前文件的目录
current_dir = os.path.dirname(os.path.abspath(__file__))
# 向上两级找到项目根目录 (ui -> tools -> project_root)
return os.path.dirname(os.path.dirname(current_dir))
def _find_script(self, script_name):
"""查找脚本文件"""
# 可能的脚本路径
possible_paths = [
os.path.join(self.base_dir, 'tools', 'core', script_name), # 从项目根目录查找
os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'core', script_name), # 从tools目录查找
os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'core', script_name)), # 相对于ui目录
os.path.join(os.getcwd(), 'tools', 'core', script_name) # 从当前工作目录查找
]
# 尝试每个可能的路径
for path in possible_paths:
path = os.path.normpath(path) # 规范化路径
if os.path.exists(path):
self.log.emit(f"找到脚本: {path}")
return path
# 如果找不到脚本,记录所有尝试的路径
self.log.emit("无法找到脚本,尝试过以下路径:")
for path in possible_paths:
path = os.path.normpath(path)
self.log.emit(f" - {path} (存在: {os.path.exists(path)})")
# 返回第一个路径(即使它不存在)
return possible_paths[0]
def run_script_external(self, script_path, args, working_dir=None):
"""
使用外部Python窗口运行脚本显示实时输出
Args:
script_path: 脚本路径
args: 命令行参数字典
working_dir: 工作目录
Returns:
返回启动是否成功
"""
try:
# 更详细的日志
self.log.emit(f"尝试在外部窗口运行脚本: {script_path}")
# 检查脚本是否存在
if not os.path.exists(script_path):
self.error.emit(f"脚本不存在: {script_path}")
self.log.emit(f"当前工作目录: {os.getcwd()}")
self.log.emit(f"尝试查找脚本...")
# 从脚本名尝试查找脚本
script_name = os.path.basename(script_path)
actual_path = self._find_script(script_name)
if not os.path.exists(actual_path):
self.error.emit(f"无法找到脚本: {script_name}")
return False
script_path = actual_path
self.log.emit(f"使用脚本路径: {script_path}")
# 获取ArcGIS Pro的Python解释器路径
python_exe = self._get_arcgis_python() or sys.executable
self.log.emit(f"使用Python解释器: {python_exe}")
# 构建命令行参数
cmd = [python_exe, script_path]
# 添加参数
for key, value in args.items():
if key.startswith('--'):
param_key = key
else:
param_key = f"--{key}"
if isinstance(value, bool):
# 布尔参数
if value:
cmd.append(param_key)
elif isinstance(value, list):
# 列表参数通过JSON序列化传递
cmd.append(param_key)
cmd.append(json.dumps(value))
elif value is not None:
# 其他参数
cmd.append(param_key)
cmd.append(str(value))
# 通知开始
self.started.emit(f"开始执行脚本: {os.path.basename(script_path)}")
# 创建临时批处理文件,为了能自动关闭窗口
batch_file_path = os.path.join(os.environ.get('TEMP', '.'), f"run_script_{os.getpid()}.bat")
# 构建批处理文件内容
batch_content = f"""@echo off
chcp 65001 > nul
echo 正在执行脚本: {os.path.basename(script_path)}...
echo 命令: {' '.join(cmd)}
echo.
"""
# 设置工作目录
if working_dir:
batch_content += f'cd /d "{working_dir}"\n'
# 添加执行命令
batch_content += f'"{python_exe}" "{script_path}"'
# 添加命令行参数
for i in range(2, len(cmd)):
# 检查参数是否需要引号
param = cmd[i]
if ' ' in param or ',' in param or ';' in param or '&' in param or '[' in param or ']' in param:
batch_content += f' "{param}"'
else:
batch_content += f' {param}'
# 添加执行完成后的提示和暂停
batch_content += "\necho.\necho 脚本执行完成窗口将在3秒后自动关闭...\ntimeout /t 3 >nul\n"
# 写入批处理文件
with open(batch_file_path, 'w', encoding='utf-8') as f:
f.write(batch_content)
self.log.emit(f"创建批处理文件: {batch_file_path}")
# 使用subprocess.Popen启动外部窗口
# 使用start命令在新窗口中运行批处理文件
start_cmd = f'start "执行脚本 - {os.path.basename(script_path)}" "{batch_file_path}"'
subprocess.Popen(start_cmd, shell=True)
# 模拟成功启动
self.log.emit("已在外部窗口启动脚本")
# 注意:由于是在外部窗口运行,我们无法获知脚本的实际结果
# 假设脚本已成功启动,但实际执行结果需要用户观察外部窗口
QThread.msleep(1000) # 等待1秒
self.finished.emit(True)
# 预定批处理文件的删除
def delete_batch_file():
try:
if os.path.exists(batch_file_path):
os.remove(batch_file_path)
self.log.emit(f"已删除临时批处理文件: {batch_file_path}")
except Exception as e:
self.log.emit(f"删除临时批处理文件失败: {str(e)}")
# 创建延迟删除任务
QThread.sleep(10) # 确保批处理文件有足够时间执行
delete_task = LambdaTask(delete_batch_file)
thread_pool = QThreadPool.globalInstance()
if thread_pool is not None:
thread_pool.start(delete_task)
return True
except Exception as e:
self.error.emit(f"启动外部脚本时出错: {str(e)}")
self.log.emit(traceback.format_exc())
return False
def run_script(self, script_path, args, working_dir=None):
"""使用非阻塞方式运行脚本"""
try:
# 检查脚本是否存在
if not os.path.exists(script_path):
self.error.emit(f"脚本不存在: {script_path}")
self.log.emit(f"当前工作目录: {os.getcwd()}")
self.log.emit(f"尝试查找脚本...")
# 从脚本名尝试查找脚本
script_name = os.path.basename(script_path)
actual_path = self._find_script(script_name)
if not os.path.exists(actual_path):
self.error.emit(f"无法找到脚本: {script_name}")
return False
script_path = actual_path
# 获取ArcGIS Pro的Python解释器路径
python_exe = self._get_arcgis_python() or sys.executable
self.log.emit(f"使用Python解释器: {python_exe}")
# 构建命令行参数
cmd = [python_exe, "-u", script_path]
# 添加参数
for key, value in args.items():
if key.startswith('--'):
param_key = key
else:
param_key = f"--{key}"
if isinstance(value, bool):
# 布尔参数
if value:
cmd.append(param_key)
elif value is not None:
# 其他参数
cmd.append(param_key)
cmd.append(str(value))
# 创建QProcess而不是subprocess.Popen
process = QProcess()
# 连接QProcess的信号
process.readyReadStandardOutput.connect(
lambda: self._handle_stdout(process)
)
process.readyReadStandardError.connect(
lambda: self._handle_stderr(process)
)
process.finished.connect(
lambda exit_code, exit_status: self._handle_finished(
exit_code, exit_status, process
)
)
# 设置工作目录
if working_dir:
process.setWorkingDirectory(working_dir)
# 通知开始
self.started.emit(f"开始执行脚本: {os.path.basename(script_path)}")
# 非阻塞启动进程
process.start(cmd[0], cmd[1:])
# 将进程对象保存在类变量中,防止被垃圾回收
self.current_process = process
# 返回True表示进程已启动结果将通过信号异步通知
return True
except Exception as e:
self.error.emit(f"执行脚本时出错: {str(e)}")
self.log.emit(traceback.format_exc())
return False
def _handle_stdout(self, process):
"""处理标准输出"""
try:
# 首先尝试使用utf-8解码
data = process.readAllStandardOutput().data()
try:
text = data.decode('utf-8')
except UnicodeDecodeError:
# utf-8解码失败尝试使用系统默认编码或GBK中文Windows常用
try:
text = data.decode('gbk')
except UnicodeDecodeError:
# 如果还是失败使用errors='replace'选项替换无法解码的字符
text = data.decode('utf-8', errors='replace')
if text:
for line in text.splitlines():
if line.strip():
self.log.emit(line.strip())
except Exception as e:
self.log.emit(f"处理标准输出时出错: {str(e)}")
def _handle_stderr(self, process):
"""处理错误输出"""
try:
# 首先尝试使用utf-8解码
data = process.readAllStandardError().data()
try:
text = data.decode('utf-8')
except UnicodeDecodeError:
# utf-8解码失败尝试使用系统默认编码或GBK中文Windows常用
try:
text = data.decode('gbk')
except UnicodeDecodeError:
# 如果还是失败使用errors='replace'选项替换无法解码的字符
text = data.decode('utf-8', errors='replace')
if text:
for line in text.splitlines():
if line.strip():
self.log.emit(f"错误: {line.strip()}")
except Exception as e:
self.log.emit(f"处理错误输出时出错: {str(e)}")
def _handle_finished(self, exit_code, exit_status, process):
"""处理进程结束"""
if exit_code == 0:
self.log.emit(f"脚本执行成功,返回代码: {exit_code}")
self.finished.emit(True)
else:
self.error.emit(f"脚本执行失败,返回代码: {exit_code}")
self.finished.emit(False)
# 清理进程
process.close()
if hasattr(self, 'current_process') and self.current_process == process:
self.current_process = None
def _get_arcgis_python(self):
"""获取ArcGIS Pro的Python解释器路径"""
try:
# 常见的ArcGIS Pro Python路径
arcgis_python_paths = [
r"C:\Program Files\ArcGIS\Pro\bin\Python\envs\arcgispro-py3\python.exe",
r"C:\Program Files\ArcGIS\Pro\bin\Python\python.exe"
]
# 检查环境变量
if 'ARCGISPRO_PYTHON' in os.environ:
arcgis_python_paths.insert(0, os.environ['ARCGISPRO_PYTHON'])
# 尝试每个可能的路径
for path in arcgis_python_paths:
if os.path.exists(path):
return path
return None # 如果找不到返回None
except Exception as e:
self.log.emit(f"获取ArcGIS Python路径失败: {str(e)}")
return None
def run_export_map(self, params):
"""
运行导出地图脚本
Args:
params: 参数字典,包含:
- config_file: 配置文件路径
- county_name: 区县名称
- polygon_list: 要导出的图层列表
- template_aprx_file: 模板文件路径
- output_path: 输出路径
- data_source_path: 数据源路径
- symbol_path: 符号文件路径
- force_regenerate: 是否强制重新生成
Returns:
返回脚本执行结果
"""
try:
# 检查必要参数是否存在
required_params = ['config_file', 'county_name', 'polygon_list',
'template_aprx_file', 'output_path',
'data_source_path', 'symbol_path']
for param in required_params:
if param not in params:
self.error.emit(f"缺少必要参数: {param}")
return False
# 查找脚本路径
script_path = self._find_script('export_map.py')
# 构建命令行参数
args = {
'config_file': params['config_file'],
'county_name': params['county_name'],
'polygon_list': json.dumps(params['polygon_list']), # 将图层列表序列化为JSON字符串
'template_aprx_file': params['template_aprx_file'],
'output_path': params['output_path'],
'data_source_path': params['data_source_path'],
'symbol_path': params['symbol_path']
}
# 添加可选参数
if 'force_regenerate' in params and params['force_regenerate']:
args['force_regenerate'] = True
# 使用外部窗口执行脚本
return self.run_script(script_path, args)
except Exception as e:
self.error.emit(f"执行导出地图脚本时出错: {str(e)}")
self.log.emit(traceback.format_exc())
return False
def run_batch_export_map(self, params):
"""运行批量导出地图脚本 """
try:
# 获取参数
if 'export_config' not in params:
self.error.emit(f"缺少必要参数: export_config")
return False
export_config = params['export_config']
county_name = params['county_name']
template_aprx_file = params['template_aprx_file']
output_folder = params['output_path']
data_source_path = params['data_source_path']
symbol_path = params['symbol_path']
polygon_list = params.get('polygon_list', []) # 如果指定了图层列表,则只导出这些图层
force_regenerate = params.get('force_regenerate', False) # 是否强制重新生成工程文件
# 如果没有指定图层列表,则使用配置文件中的所有图层
if not polygon_list:
polygon_list = list(export_config.keys())
self.log.emit(f"开始批量导出 {len(polygon_list)} 个图层")
# 准备脚本路径
script_path = self._find_script('export_map.py')
# 记录成功和失败的图层
success_count = 0
failed_count = 0
# 循环处理每个图层
for layer_name in polygon_list:
try:
self.log.emit(f"\n===== 开始处理图层: {layer_name} =====")
# 构建命令行参数
args = {
'config': params['config_file'],
'county': county_name,
'layer': layer_name,
'template': template_aprx_file,
'output': output_folder,
'data_source': data_source_path,
'symbol': symbol_path,
'force': force_regenerate
}
# 执行脚本
if self.run_script(script_path, args):
success_count += 1
else:
failed_count += 1
except Exception as e:
self.log.emit(f"处理图层 {layer_name} 时出错: {str(e)}")
failed_count += 1
# 通知完成
self.log.emit(f"\n===== 批量处理完成 =====")
self.log.emit(f"总计图层: {len(polygon_list)}")
self.log.emit(f"成功: {success_count} 个")
self.log.emit(f"失败: {failed_count} 个")
result = {
'total': len(polygon_list),
'success': success_count,
'failed': failed_count
}
self.finished.emit(result)
return result
except Exception as e:
self.error.emit(f"执行批量导出地图脚本时出错: {str(e)}")
self.log.emit(traceback.format_exc())
return False
def run_export_layout(self, params):
"""
运行导出布局脚本
Args:
params: 参数字典
Returns:
返回脚本执行结果
"""
try:
# 准备参数
script_path = self._find_script('export_layout.py')
# 构建命令行参数
args = {
'mode': 'single',
'aprx': params['aprx_path'],
'output': params['output_path'],
'format': params.get('export_format', 'PDF'),
'resolution': params.get('resolution', 300),
'name': params.get('output_name', '')
}
# 执行脚本
# return self.run_script(script_path, args)
except Exception as e:
self.error.emit(f"执行导出布局脚本时出错: {str(e)}")
self.log.emit(traceback.format_exc())
return False
def run_batch_export_layout(self, params):
"""
运行批量导出布局脚本
Args:
params: 参数字典
Returns:
返回脚本执行结果
"""
try:
# 准备参数
script_path = self._find_script('export_layout.py')
# 构建命令行参数
args = {
'aprx_file_list':json.dumps(params['aprx_file_list']),
'input_aprx_folder': params['input_aprx_folder'],
'output': params['output_image_path'],
'format': params.get('export_format', 'PDF'),
'resolution': params.get('resolution', 300),
'use_multiprocessing': params.get('use_multiprocessing', False),
'process_count': params.get('process_count', 2)
}
# 执行脚本
return self.run_script(script_path, args)
except Exception as e:
self.error.emit(f"执行批量导出布局脚本时出错: {str(e)}")
self.log.emit(traceback.format_exc())
return False
def run_process_raster(self, params):
"""
运行栅格处理脚本
Args:
params: 参数字典
Returns:
返回脚本执行结果
"""
try:
# 准备参数
script_path = self._find_script('raster_to_vector.py')
# 构建命令行参数
args = {
'command': 'process',
'input': params['input_raster'],
'output': params['output_raster'],
'format': params.get('output_format', 'TIFF'),
'compression': params.get('compression', 'LZW')
}
# 执行脚本
return self.run_script(script_path, args)
except Exception as e:
self.error.emit(f"执行栅格处理脚本时出错: {str(e)}")
self.log.emit(traceback.format_exc())
return False
def run_raster_batch_process(self, params):
"""
运行批量栅格处理脚本
Args:
params: 参数字典
Returns:
返回脚本执行结果
"""
try:
# 准备参数
script_path = self._find_script('raster_handler.py')
params["selected_files"] = json.dumps(params.get("selected_files"))
# 执行脚本
return self.run_script(script_path, params)
except Exception as e:
self.error.emit(f"执行批量栅格处理脚本时出错: {str(e)}")
self.log.emit(traceback.format_exc())
return False
def run_test_script(self, params):
try:
# 检查必要参数是否存在
required_params = ['message', 'count']
for param in required_params:
if param not in params:
self.error.emit(f"缺少必要参数: {param}")
return False
# 查找脚本路径
script_path = self._find_script('test_script.py')
# 构建命令行参数
args = {
'message': params['message'],
'count': params['count']
}
# 执行脚本
return self.run_script(script_path, args)
except Exception as e:
self.error.emit(f"执行导出地图脚本时出错: {str(e)}")
self.log.emit(traceback.format_exc())
return False