This commit is contained in:
2025-11-09 18:23:19 +08:00
parent f9edc3ea13
commit b5c3b17a1d
11 changed files with 766 additions and 39 deletions

3
.gitignore vendored Normal file
View File

@@ -0,0 +1,3 @@
/data
/output
*__pycache__

87
main.py
View File

@@ -1,49 +1,56 @@
# main.py
import arcpy
import os
import sys
import geopandas as gpd
import rasterio
from exactextract import exact_extract
# 将项目根目录添加到Python路径以便导入自定义模块
# 假设 main.py 在项目根目录
project_root = os.path.dirname(os.path.abspath(__file__))
if project_root not in sys.path:
sys.path.insert(0, project_root)
from src import config
from src.utils.logger_setup import logger
from src.analysis.spatial_analyzer import SpatialAnalyzer
from src.reporting.report_generator import ReportGenerator
def main():
"""
项目主入口函数,负责调度空间分析和报告生成流程。
"""
logger.info("==================================================")
logger.info(" 地理处理与土壤属性分析项目启动 ")
logger.info("==================================================")
# 确保ArcPy环境可用
try:
proj_lib_path = os.path.join(sys.prefix, 'Lib', 'site-packages', 'rasterio', 'proj_data')
os.environ['PROJ_LIB'] = proj_lib_path
arcpy.GetInstallInfo()
logger.info(f"ArcPy环境已加载: {arcpy.GetInstallInfo()['ProductName']} {arcpy.GetInstallInfo()['Version']}")
except Exception as e:
print(f"Warning: Could not automatically set PROJ_LIB. Please set it manually. Error: {e}")
logger.critical(f"ArcPy环境未正确配置或加载失败: {str(e)}")
logger.critical("请确保在ArcGIS Pro的Python环境中运行此脚本。")
sys.exit(1)
# 定义文件路径
raster_path = "D:/工作/三普成果编制/出图数据/容县/栅格0925/AB/AB.tif"
vector_path = "D:/测试文件夹/容县耕园林草.shp"
# 1. 执行空间分析
logger.info("阶段一:开始执行空间分析...")
analyzer = SpatialAnalyzer(config)
analysis_success = analyzer.execute_analysis()
# 1. 使用 rasterio 读取栅格的坐标参考系统 (CRS)
with rasterio.open(raster_path) as src:
raster_crs = src.crs
# 2. 如果分析成功,则生成报告
if analysis_success:
logger.info("阶段二空间分析完成开始生成Excel报告...")
reporter = ReportGenerator(config)
report_success = reporter.generate_report()
if report_success:
logger.info("阶段二Excel报告生成完毕")
else:
logger.error("阶段二Excel报告生成失败。")
else:
logger.error("阶段一:空间分析失败,请检查日志。")
# 2. 使用 geopandas 读取矢量文件
gdf = gpd.read_file(vector_path)
logger.info("==================================================")
logger.info(" 地理处理与土壤属性分析项目结束 ")
logger.info("==================================================")
# 3. 检查并转换矢量数据的 CRS
print(f"原始矢量CRS: {gdf.crs}")
print(f"目标栅格CRS: {raster_crs}")
if gdf.crs != raster_crs:
print("CRS不匹配正在转换矢量数据的CRS...")
# 使用 .to_crs() 方法进行转换
gdf = gdf.to_crs(raster_crs)
print("转换完成。")
# 4. 现在,将已经对齐了坐标系的 GeoDataFrame 传递给 exact_extract
# 注意:可以直接传递 GeoDataFrame 对象,而不仅仅是文件路径
stats_to_calculate = ['mean', 'sum', 'count', 'min', 'max']
results = exact_extract(raster_path, gdf, stats_to_calculate)
# 5. 将结果合并回 GeoDataFrame
# exact_extract 在处理 GeoDataFrame 时,会保留原始的行顺序
for stat in stats_to_calculate:
# 从结果列表中提取每个要素的'properties'字典中的统计值
gdf[stat] = [res['properties'][stat] for res in results]
# 打印最终带有统计结果的 GeoDataFrame
print("\n分区统计结果:")
print(gdf.head())
gdf.to_file("ddddl.shp", driver='ESRI Shapefile', encoding='utf-8')
if __name__ == "__main__":
main()

3
requirements.txt Normal file
View File

@@ -0,0 +1,3 @@
arcpy
pandas
openpyxl

1
src/__init__.py Normal file
View File

@@ -0,0 +1 @@
# src/__init__.py

1
src/analysis/__init__.py Normal file
View File

@@ -0,0 +1 @@
# src/analysis/__init__.py

View File

@@ -0,0 +1,142 @@
# src/analysis/data_manager.py
import arcpy
import os
import pandas as pd
from typing import Literal
from src.utils.logger_setup import logger
from src import config
class DataManager:
"""
负责所有与数据源GDB、文件夹的交互包括数据的读取、写入和管理。
它不包含业务逻辑,只负责数据的“拿”和“放”。
"""
def __init__(self):
self.logger = logger
self.output_gdb = config.OUTPUT_GDB
self.logger.info(f"DataManager 初始化输出GDB: {self.output_gdb}")
def get_feature_classes(self, input_location: str, shape_type: Literal['Point', 'Polyline', 'Polygon'] = "Point") -> list:
"""
从文件夹或GDB中获取所有指定类型的要素类。
参数:
input_location: 输入路径文件夹或GDB
shape_type: 要获取的要素类型(默认为"Point"
返回:
list: 要素类路径列表
"""
feature_class_list = []
self.logger.info(f"正在从 '{input_location}' 获取 '{shape_type}' 类型的要素类...")
try:
# 检查输入是文件夹还是GDB
if input_location.endswith('.gdb'):
# GDB工作空间
arcpy.env.workspace = input_location
feature_classes = arcpy.ListFeatureClasses(feature_type=shape_type) # 直接按类型过滤
for fc in feature_classes:
fc_path = os.path.join(input_location, fc)
feature_class_list.append(fc_path)
else:
# 文件夹工作空间搜索shp文件
arcpy.env.workspace = input_location
# 获取所有shp文件
shapefiles = arcpy.ListFiles("*.shp")
for shp in shapefiles:
shp_path = os.path.join(input_location, shp)
desc = arcpy.Describe(shp_path)
if desc.shapeType == shape_type:
feature_class_list.append(shp_path)
except arcpy.ExecuteError:
self.logger.error(f"ArcGIS工具执行错误: {arcpy.GetMessages(2)}")
except Exception as e:
self.logger.error(f"读取输入位置时出错: {str(e)}")
self.logger.info(f"'{input_location}' 找到 {len(feature_class_list)}'{shape_type}' 要素类。")
return feature_class_list
def save_output_feature_class(self, in_memory_fc: str, output_name: str) -> str | None:
"""
将内存中的要素类写入输出路径GDB
参数:
in_memory_fc: 要写入的内存要素类
output_name: 输出名称
返回:
str: 输出要素类的完整路径,如果失败则返回 None
"""
output_path = self.output_gdb
self.logger.info(f"正在将要素类 '{in_memory_fc}' 保存到 '{output_path}',名称为 '{output_name}'...")
try:
# 检查输出路径是GDB
if not arcpy.Exists(output_path):
# 创建GDB
parent_dir = os.path.dirname(output_path)
gdb_name = os.path.basename(output_path).replace('.gdb', '')
arcpy.management.CreateFileGDB(parent_dir, gdb_name)
self.logger.info(f"创建GDB: {output_path}")
output_fc = os.path.join(output_path, output_name)
# 如果目标要素类已存在,则删除
if arcpy.Exists(output_fc):
arcpy.management.Delete(output_fc)
self.logger.warning(f"已删除现有要素类: {output_fc}")
arcpy.management.CopyFeatures(in_memory_fc, output_fc)
self.logger.info(f"结果写入GDB: {output_fc}")
return output_fc
except arcpy.ExecuteError:
self.logger.error(f"ArcGIS工具执行错误: {arcpy.GetMessages(2)}")
return None
except Exception as e:
self.logger.error(f"写入输出时出错: {str(e)}")
return None
def gdb_table_to_dataframe(self, table_path: str) -> pd.DataFrame | None:
"""
将GDB中的表格读取为Pandas DataFrame。
参数:
table_path: GDB中表格的完整路径
返回:
pd.DataFrame: 读取到的DataFrame如果失败则返回 None
"""
self.logger.info(f"正在从 '{table_path}' 读取表格到 DataFrame...")
if not arcpy.Exists(table_path):
self.logger.error(f"表格不存在: {table_path}")
return None
try:
# 使用arcpy.da.FeatureClassToNumPyArray 或 arcpy.da.TableToNumPyArray
# 这里假设是普通表如果需要几何信息可以使用FeatureClassToNumPyArray
# fields = [f.name for f in arcpy.ListFields(table_path)]
# data = arcpy.da.TableToNumPyArray(table_path, fields)
# df = pd.DataFrame(data)
# 更直接的方法使用arcpy.da.SearchCursor
data = []
fields = [f.name for f in arcpy.ListFields(table_path) if f.type not in ('Geometry', 'OID')] # 排除几何和OID字段
with arcpy.da.SearchCursor(table_path, fields) as cursor:
for row in cursor:
data.append(row)
df = pd.DataFrame(data, columns=fields)
self.logger.info(f"成功从 '{table_path}' 读取 {len(df)} 行数据。")
return df
except arcpy.ExecuteError:
self.logger.error(f"ArcGIS工具执行错误: {arcpy.GetMessages(2)}")
return None
except Exception as e:
self.logger.error(f"读取GDB表格到DataFrame时出错: {str(e)}")
return None

View File

@@ -0,0 +1,339 @@
# src/analysis/spatial_analyzer.py
import arcpy
import os
from src.utils.logger_setup import logger
from src import config
from src.analysis.data_manager import DataManager
class SpatialAnalyzer:
"""
封装项目最核心的空间处理和统计分析逻辑。
这是业务逻辑的中心,负责样点属性增强、空间连接和汇总统计。
"""
def __init__(self, config_module):
self.logger = logger
self.config = config_module
self.data_manager = DataManager()
self.logger.info("SpatialAnalyzer 初始化。")
# 设置ArcPy环境
arcpy.env.overwriteOutput = True
arcpy.env.workspace = "in_memory" # 默认工作空间设置为内存,方便临时数据管理
self.final_enriched_points_path = None # 存储最终增强样点的路径
self.summary_table_path = None # 存储最终统计表的路径
def execute_analysis(self) -> bool:
"""
作为执行空间分析的唯一入口,按顺序调用内部方法完成整个流程。
"""
self.logger.info("开始执行空间分析流程...")
try:
# 1. 预检
if not self._run_pre_analysis_checks():
self.logger.error("预分析检查失败,分析终止。")
return False
# 2. 获取初始点要素
initial_points = self._get_initial_point_features()
if not initial_points:
self.logger.error("未找到初始点要素,分析终止。")
return False
# 3. 合并与去重
merged_deduplicated_points = self._merge_and_deduplicate_points(initial_points)
if not merged_deduplicated_points:
self.logger.error("合并与去重失败,分析终止。")
return False
# 4. 属性连接 (多边形属性)
enriched_points = self._enrich_attributes_by_spatial_join(merged_deduplicated_points)
if not enriched_points:
self.logger.error("空间连接失败,分析终止。")
return False
# 5. 清理字段 (保留指定字段)
final_points_for_stats = self._cleanup_fields(enriched_points)
if not final_points_for_stats:
self.logger.error("字段清理失败,分析终止。")
return False
# 6. 汇总统计
summary_table = self._calculate_summary_statistics(final_points_for_stats)
if not summary_table:
self.logger.error("汇总统计失败,分析终止。")
return False
# 7. 保存最终结果
self._save_final_results(final_points_for_stats, summary_table)
self.logger.info("空间分析流程成功完成。")
return True
except arcpy.ExecuteError:
self.logger.error(f"ArcGIS工具执行错误: {arcpy.GetMessages(2)}")
return False
except Exception as e:
self.logger.error(f"空间分析过程中发生未知错误: {str(e)}", exc_info=True)
return False
finally:
# 清理内存中的临时数据
self._cleanup_in_memory()
def _run_pre_analysis_checks(self) -> bool:
"""
执行预分析检查,验证输入数据和字段是否存在。
"""
self.logger.info("执行预分析检查...")
# 1. 检查输入GDB是否存在
if not arcpy.Exists(self.config.INPUT_DATA_GDB):
self.logger.error(f"输入数据GDB不存在: {self.config.INPUT_DATA_GDB}")
return False
if not arcpy.Exists(self.config.BASE_DATA_GDB):
self.logger.error(f"基础数据GDB不存在: {self.config.BASE_DATA_GDB}")
return False
# 2. 检查用于空间连接的多边形图层及其字段是否存在
for layer_name, field_names in self.config.JOIN_POLYGON_LAYERS.items():
layer_path = os.path.join(self.config.BASE_DATA_GDB, layer_name)
if not arcpy.Exists(layer_path):
self.logger.error(f"空间连接图层不存在: {layer_path}")
return False
for field_name in field_names:
if not arcpy.ListFields(layer_path, field_name):
self.logger.error(f"空间连接图层 '{layer_name}' 中缺少字段: '{field_name}'")
return False
self.logger.info("预分析检查通过。")
return True
def _get_initial_point_features(self) -> list | None:
"""
从配置的输入GDB中获取所有点要素类。
"""
self.logger.info("正在获取初始点要素类...")
point_features = self.data_manager.get_feature_classes(self.config.INPUT_DATA_GDB, shape_type="Point")
if not point_features:
self.logger.warning(f"'{self.config.INPUT_DATA_GDB}' 中未找到任何点要素类。")
return None
return point_features
def _merge_and_deduplicate_points(self, point_features: list) -> str | None:
"""
合并所有点要素类并删除重复点。
"""
self.logger.info("正在合并点要素...")
temp_merged_points = "in_memory/temp_merged_points"
try:
arcpy.management.Merge(point_features, temp_merged_points, field_match_mode="USE_FIRST_SCHEMA")
merge_count = int(arcpy.management.GetCount(temp_merged_points).getOutput(0))
self.logger.info(f"合并后要素数量: {merge_count}")
self.logger.info("正在删除重复点...")
arcpy.management.DeleteIdentical(temp_merged_points, "Shape", "1 Meters")
after_dedup_count = int(arcpy.management.GetCount(temp_merged_points).getOutput(0))
self.logger.info(f"去重后要素数量: {after_dedup_count} (删除 {merge_count - after_dedup_count} 个重复点)")
# 删除除了 Shape 和 OID 类字段外的所有字段
try:
merged_points = "in_memory/merged_points"
arcpy.conversion.ExportFeatures(temp_merged_points, merged_points)
del_fields = [f.name for f in arcpy.ListFields(merged_points)
if f.type not in ("Geometry", "OID") and f.name.upper() not in ("SHAPE", "OBJECTID", "FID")]
if del_fields:
arcpy.management.DeleteField(merged_points, del_fields)
current_data = merged_points
except Exception as ex:
self.logger.error(f"创建仅含几何的临时要素类失败: {str(ex)}")
current_data = temp_merged_points
return current_data
except arcpy.ExecuteError:
self.logger.error(f"合并或去重时ArcGIS工具执行错误: {arcpy.GetMessages(2)}")
return None
except Exception as e:
self.logger.error(f"合并或去重过程中发生错误: {str(e)}")
return None
def _enrich_attributes_by_spatial_join(self, target_features: str) -> str | None:
"""
通过空间连接为样点赋值多边形属性(行政区划、土地利用、土壤类型)。
"""
self.logger.info("开始执行空间连接,为样点赋值多边形属性...")
current_data = target_features
# 遍历配置中需要连接的多边形图层
for layer_name, field_names in self.config.JOIN_POLYGON_LAYERS.items():
join_feature_path = os.path.join(self.config.BASE_DATA_GDB, layer_name)
if not arcpy.Exists(join_feature_path):
self.logger.warning(f"警告: 连接要素不存在,跳过: {join_feature_path}")
continue
temp_output = f"in_memory/joined_{layer_name}"
self.logger.info(f"正在将 '{current_data}''{layer_name}' ({join_feature_path}) 进行空间连接...")
try:
# 构建字段映射,只保留需要连接的字段
field_mappings = arcpy.FieldMappings()
field_mappings.addTable(current_data) # 添加目标要素的字段
# 添加连接要素的字段
join_fields_to_add = []
for field_name in field_names:
field_map = arcpy.FieldMap()
field_map.addInputField(join_feature_path, field_name)
field_mappings.addFieldMap(field_map)
join_fields_to_add.append(field_name) # 记录要添加的字段名
arcpy.analysis.SpatialJoin(
target_features=current_data,
join_features=join_feature_path,
out_feature_class=temp_output,
join_operation="JOIN_ONE_TO_ONE",
join_type="KEEP_ALL",
match_option="CLOSEST", # 样点与多边形通常使用相交
search_radius="500 Meters",
field_mapping=field_mappings # 使用字段映射来控制输出字段
)
current_data = temp_output
self.logger.info(f"'{layer_name}' 连接完成,连接字段: {', '.join(join_fields_to_add)}")
except arcpy.ExecuteError:
self.logger.error(f"'{layer_name}' 空间连接时ArcGIS工具执行错误: {arcpy.GetMessages(2)}")
return None
except Exception as e:
self.logger.error(f"'{layer_name}' 空间连接过程中发生错误: {str(e)}")
return None
self.logger.info("所有空间连接完成。")
return current_data
def _cleanup_fields(self, feature_class: str) -> str | None:
"""
只保留指定的土壤属性字段、分组字段以及必要的几何和OID字段。
"""
self.logger.info("正在清理和筛选字段...")
output_fc = "in_memory/cleaned_fields_fc"
try:
# 复制要素类以进行字段操作
arcpy.management.CopyFeatures(feature_class, output_fc)
# 构建要保留的字段列表
fields_to_keep = ["Geometry", "OID", "SHAPE", "OBJECTID", "FID","OBJECTID_1"] # 几何和OID字段
fields_to_keep.extend(self.config.KEEP_SOIL_FIELDS) # 配置中指定的土壤属性字段
fields_to_keep.extend(self.config.GROUP_BY_FIELDS) # 分组字段
# 获取当前要素类中的所有字段名
all_fields = [f.name for f in arcpy.ListFields(output_fc)]
# 确定需要删除的字段
fields_to_delete = []
for field in all_fields:
# 忽略系统字段和要保留的字段
if field.upper() not in [f.upper() for f in fields_to_keep]:
fields_to_delete.append(field)
if fields_to_delete:
arcpy.management.DeleteField(output_fc, fields_to_delete)
self.logger.info(f"已删除字段: {', '.join(fields_to_delete)}")
else:
self.logger.info("无需删除额外字段。")
# 检查是否有配置中指定的字段在最终要素类中缺失
missing_fields = [f for f in self.config.KEEP_SOIL_FIELDS + self.config.GROUP_BY_FIELDS if not arcpy.ListFields(output_fc, f)]
if missing_fields:
self.logger.warning(f"警告: 以下配置的字段在清理后缺失: {', '.join(missing_fields)}")
return output_fc
except arcpy.ExecuteError:
self.logger.error(f"字段清理时ArcGIS工具执行错误: {arcpy.GetMessages(2)}")
return None
except Exception as e:
self.logger.error(f"字段清理过程中发生错误: {str(e)}")
return None
def _calculate_summary_statistics(self, feature_class: str) -> str | None:
"""
对样点数据按指定字段进行分组汇总统计。
"""
self.logger.info("开始计算汇总统计...")
output_table = os.path.join("in_memory", self.config.SUMMARY_TABLE_NAME)
# 准备统计字段列表
statistics_fields = []
exists_field = [f.name for f in arcpy.ListFields(feature_class)]
exists_field.remove("TRZD")
for field in self.config.STATISTICS_FIELDS:
if field not in exists_field:
continue
for stat_type in self.config.STATISTICS_TYPE:
statistics_fields.append([field, stat_type])
if not statistics_fields:
self.logger.warning("未配置任何统计字段或统计类型,跳过汇总统计。")
return None
try:
# 检查分组字段是否存在
for group_field in self.config.GROUP_BY_FIELDS:
if not arcpy.ListFields(feature_class, group_field):
self.logger.error(f"分组字段 '{group_field}' 不存在于要素类中,无法进行统计。")
return None
arcpy.analysis.Statistics(
in_table=feature_class,
out_table=output_table,
statistics_fields=statistics_fields,
case_field=self.config.GROUP_BY_FIELDS
)
self.logger.info(f"汇总统计完成,结果表: {output_table}")
return output_table
except arcpy.ExecuteError:
self.logger.error(f"汇总统计时ArcGIS工具执行错误: {arcpy.GetMessages(2)}")
return None
except Exception as e:
self.logger.error(f"汇总统计过程中发生错误: {str(e)}")
return None
def _save_final_results(self, enriched_points_fc: str, summary_table: str):
"""
保存最终增强后的样点要素类和统计结果表到输出GDB。
"""
self.logger.info("正在保存最终分析结果...")
# 保存增强后的样点
self.final_enriched_points_path = self.data_manager.save_output_feature_class(
enriched_points_fc, self.config.OUTPUT_FC_NAME
)
if self.final_enriched_points_path:
self.logger.info(f"增强样点已保存到: {self.final_enriched_points_path}")
else:
self.logger.error("增强样点保存失败。")
# 保存统计结果表
output_table_path = os.path.join(self.config.OUTPUT_GDB, self.config.SUMMARY_TABLE_NAME)
try:
if arcpy.Exists(output_table_path):
arcpy.management.Delete(output_table_path)
self.logger.warning(f"已删除现有统计表: {output_table_path}")
arcpy.management.CopyRows(summary_table, output_table_path)
self.summary_table_path = output_table_path
self.logger.info(f"统计结果表已保存到: {self.summary_table_path}")
except arcpy.ExecuteError:
self.logger.error(f"保存统计结果表时ArcGIS工具执行错误: {arcpy.GetMessages(2)}")
except Exception as e:
self.logger.error(f"保存统计结果表过程中发生错误: {str(e)}")
def _cleanup_in_memory(self):
"""
清理ArcPy内存工作空间中的临时数据。
"""
self.logger.info("正在清理内存中的临时数据...")
try:
arcpy.management.Delete("in_memory")
self.logger.info("内存工作空间已清理。")
except Exception as e:
self.logger.warning(f"清理内存工作空间时出错: {str(e)}")

49
src/config.py Normal file
View File

@@ -0,0 +1,49 @@
# src/config.py
import os
# 项目根目录
PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
# 输入数据路径
# 建议将实际数据GDB或文件夹放在 project_root/data/ 目录下
INPUT_DATA_GDB = os.path.join(PROJECT_ROOT, "data", "土壤属性统计表格数据.gdb") # 示例输入样点、行政区划、土地利用、土壤类型数据所在的GDB
BASE_DATA_GDB = os.path.join(PROJECT_ROOT, "data", "土壤属性统计表格数据.gdb") # 示例:基础数据,如行政区划、土地利用、土壤类型
# 输出数据路径
OUTPUT_GDB = os.path.join(PROJECT_ROOT, "output", "analysis_result.gdb")
OUTPUT_REPORT_EXCEL = os.path.join(PROJECT_ROOT, "output", "soil_analysis_report.xlsx")
LOG_FILE = os.path.join(PROJECT_ROOT, "output", "process.log")
# 空间分析相关配置
OUTPUT_FC_NAME = "三普样点合并结果" # 空间连接后的样点要素类名称
SUMMARY_TABLE_NAME = "土壤属性统计表" # 统计结果表名称
# 空间连接字段配置
# 样点需要连接的多边形图层名称及其属性字段
# 例如:{"行政区划图层名称": "行政区划名称字段", "土地利用图层名称": "土地利用类型字段", ...}
JOIN_POLYGON_LAYERS = {
"兴宁区乡镇行政边界": ["XZQMC"], # 行政区划图层名和对应的行政区名称字段列表
"地类图斑": ["DLBM"], # 土地利用图层名和对应的土地利用类型字段列表
"土壤类型图": ["YL","TS"] # 土壤类型图层名和对应的土壤类型字段列表
}
# 需要保留的土壤属性字段列表 (这些字段将从空间连接结果中筛选保留)
KEEP_SOIL_FIELDS = [
"AB", "ACU", "AMN", "AMO", "AS1", "AZN", "CEC", "ECA", "EMG", "TESE", "TN", "TP",
"TK", "AFE", "AK", "AP", "OM", "FL", "SL", "NL", "PH", "GZCHD", "YXTCHD", "TRRZ", "TRZD",
"YL", "TS"
]
# 统计分析字段配置
# 需要进行统计的土壤属性字段
STATISTICS_FIELDS = [
"PH", "OM", "TN", "TP", "TK", "CEC", "AB", "ACU", "AMN", "AMO", "AS1", "AZN", "ECA", "EMG", "TESE", "AFE", "AK", "AP", "FL", "SL", "NL", "GZCHD", "YXTCHD", "TRRZ", "TRZD"
]
# 统计类型 (例如: "MEAN", "MAX", "MIN", "SUM", "COUNT", "STD", "VAR")
STATISTICS_TYPE = ["MEAN", "MAX", "MIN", "COUNT"]
# 分组字段 (用于统计分析和报告生成)
GROUP_BY_FIELDS = ["XZQMC", "DLBM", "TS"] # 行政区名称, 土地利用类型, 土壤类型
# 日志配置
LOGGING_LEVEL = "INFO" # DEBUG, INFO, WARNING, ERROR, CRITICAL

View File

@@ -0,0 +1 @@
# src/reporting/__init__.py

View File

@@ -0,0 +1,145 @@
# src/reporting/report_generator.py
import pandas as pd
import os
from src.utils.logger_setup import logger
from src import config
from src.analysis.data_manager import DataManager
from openpyxl import Workbook
from openpyxl.utils.dataframe import dataframe_to_rows
from openpyxl.styles import Font, Border, Side, Alignment
from openpyxl.worksheet.dimensions import ColumnDimension, DimensionHolder
from openpyxl.utils import get_column_letter
class ReportGenerator:
"""
负责将分析结果GDB中的表格转化为格式精美的Excel报告。
"""
def __init__(self, config_module):
self.logger = logger
self.config = config_module
self.data_manager = DataManager()
self.logger.info("ReportGenerator 初始化。")
def generate_report(self) -> bool:
"""
作为生成报告的唯一入口,按顺序调用内部方法完成报告生成。
"""
self.logger.info("开始生成Excel报告...")
try:
# 1. 从GDB加载统计结果表
df_stats = self._load_data_from_gdb()
if df_stats is None or df_stats.empty:
self.logger.warning("未获取到统计数据或数据为空,无法生成报告。")
return False
# 2. 格式化数据
formatted_df = self._format_data(df_stats)
# 3. 写入Excel并美化样式
self._write_to_excel(formatted_df)
self.logger.info(f"Excel报告成功生成到: {self.config.OUTPUT_REPORT_EXCEL}")
return True
except Exception as e:
self.logger.error(f"生成Excel报告过程中发生错误: {str(e)}", exc_info=True)
return False
def _load_data_from_gdb(self) -> pd.DataFrame | None:
"""
从GDB加载统计结果表。
"""
self.logger.info(f"正在从GDB加载统计结果表: {self.config.OUTPUT_GDB}/{self.config.SUMMARY_TABLE_NAME}")
table_path = os.path.join(self.config.OUTPUT_GDB, self.config.SUMMARY_TABLE_NAME)
df = self.data_manager.gdb_table_to_dataframe(table_path)
return df
def _format_data(self, df: pd.DataFrame) -> pd.DataFrame:
"""
清洗和格式化数据,例如重命名字段为更易读的中文名称、调整数据结构等。
"""
self.logger.info("正在格式化统计数据...")
# 示例:重命名字段
# 统计字段的命名规则通常是 FIELD_STATTYPE (如 PH_MEAN)
# 我们可以创建一个映射来美化这些名称
rename_map = {}
for field in self.config.STATISTICS_FIELDS:
for stat_type in self.config.STATISTICS_TYPE:
original_col = f"{field}_{stat_type}".upper() # arcpy Statistics工具生成的字段名是大写
display_name = f"{field}_{stat_type.lower()}" # 转换为小写统计类型,例如 PH_mean
rename_map[original_col] = display_name
# 添加分组字段到重命名映射,如果需要
for group_field in self.config.GROUP_BY_FIELDS:
if group_field not in rename_map: # 避免重复添加
rename_map[group_field] = group_field # 暂时不改分组字段名,如果需要可以自定义
# 应用重命名
df = df.rename(columns=rename_map)
# 调整列顺序 (可选)
# desired_order = self.config.GROUP_BY_FIELDS + [rename_map.get(f"{f}_{st}".upper(), f"{f}_{st}".lower())
# for f in self.config.STATISTICS_FIELDS
# for st in self.config.STATISTICS_TYPE]
# existing_cols = [col for col in desired_order if col in df.columns]
# df = df[existing_cols]
self.logger.info("数据格式化完成。")
return df
def _write_to_excel(self, df: pd.DataFrame):
"""
将格式化后的数据写入Excel并美化样式。
"""
self.logger.info(f"正在将数据写入Excel文件: {self.config.OUTPUT_REPORT_EXCEL}")
# 确保输出目录存在
output_dir = os.path.dirname(self.config.OUTPUT_REPORT_EXCEL)
if not os.path.exists(output_dir):
os.makedirs(output_dir)
# 创建一个新的Excel工作簿
wb = Workbook()
ws = wb.active
if ws is None:
self.logger.error("无法获取活动工作表,报告生成失败。")
return
ws.title = self.config.SUMMARY_TABLE_NAME # 设置工作表名称
# 将DataFrame数据写入工作表
for r_idx, row in enumerate(dataframe_to_rows(df, index=False, header=True), 1):
ws.append(row)
if r_idx == 1: # 设置表头样式
for cell in ws[r_idx]:
cell.font = Font(bold=True)
cell.alignment = Alignment(horizontal="center", vertical="center")
cell.border = Border(left=Side(style='thin'), right=Side(style='thin'),
top=Side(style='thin'), bottom=Side(style='thin'))
else: # 设置数据行样式
for cell in ws[r_idx]:
cell.alignment = Alignment(horizontal="center", vertical="center")
cell.border = Border(left=Side(style='thin'), right=Side(style='thin'),
top=Side(style='thin'), bottom=Side(style='thin'))
# 自动调整列宽
dim_holder = DimensionHolder(worksheet=ws) # 修正参数名
for col in range(ws.min_column, ws.max_column + 1):
max_length = 0
column = get_column_letter(col)
for cell in ws[column]:
try:
if cell.value is not None and len(str(cell.value)) > max_length: # 检查cell.value是否为None
max_length = len(str(cell.value))
except:
pass
adjusted_width = (max_length + 2) * 1.2 # 增加一些边距
dim_holder[column] = ColumnDimension(ws, min=column, max=column, width=adjusted_width)
ws.column_dimensions = dim_holder
# 冻结首行
ws.freeze_panes = ws['A2']
# 保存Excel文件
wb.save(self.config.OUTPUT_REPORT_EXCEL)
self.logger.info("Excel文件保存成功。")

36
src/utils/logger_setup.py Normal file
View File

@@ -0,0 +1,36 @@
# src/utils/logger_setup.py
import logging
import os
from src import config
def setup_logging():
"""
配置项目的日志系统。
日志将输出到控制台和文件。
"""
log_file = config.LOG_FILE
log_level_str = config.LOGGING_LEVEL.upper()
# 确保日志文件目录存在
log_dir = os.path.dirname(log_file)
if not os.path.exists(log_dir):
os.makedirs(log_dir)
# 获取日志级别
log_level = getattr(logging, log_level_str, logging.INFO)
# 配置根日志记录器
logging.basicConfig(
level=log_level,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_file, encoding='utf-8'),
logging.StreamHandler() # 同时输出到控制台
]
)
# 返回一个特定的logger实例而不是根logger以便于模块化使用
return logging.getLogger(__name__)
# 在模块加载时设置日志
logger = setup_logging()