From b5c3b17a1ddb2e90e4c97c09da407d22ebe3cfaa Mon Sep 17 00:00:00 2001 From: missum Date: Sun, 9 Nov 2025 18:23:19 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 3 + main.py | 85 ++++---- requirements.txt | 3 + src/__init__.py | 1 + src/analysis/__init__.py | 1 + src/analysis/data_manager.py | 142 +++++++++++++ src/analysis/spatial_analyzer.py | 339 ++++++++++++++++++++++++++++++ src/config.py | 49 +++++ src/reporting/__init__.py | 1 + src/reporting/report_generator.py | 145 +++++++++++++ src/utils/logger_setup.py | 36 ++++ 11 files changed, 766 insertions(+), 39 deletions(-) create mode 100644 .gitignore create mode 100644 requirements.txt create mode 100644 src/__init__.py create mode 100644 src/analysis/__init__.py create mode 100644 src/analysis/data_manager.py create mode 100644 src/analysis/spatial_analyzer.py create mode 100644 src/config.py create mode 100644 src/reporting/__init__.py create mode 100644 src/reporting/report_generator.py create mode 100644 src/utils/logger_setup.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..d8085b0 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +/data +/output +*__pycache__ \ No newline at end of file diff --git a/main.py b/main.py index fd0ad0a..674dca6 100644 --- a/main.py +++ b/main.py @@ -1,49 +1,56 @@ +# main.py +import arcpy import os import sys -import geopandas as gpd -import rasterio -from exactextract import exact_extract -try: - proj_lib_path = os.path.join(sys.prefix, 'Lib', 'site-packages', 'rasterio', 'proj_data') - os.environ['PROJ_LIB'] = proj_lib_path -except Exception as e: - print(f"Warning: Could not automatically set PROJ_LIB. Please set it manually. Error: {e}") +# 将项目根目录添加到Python路径,以便导入自定义模块 +# 假设 main.py 在项目根目录 +project_root = os.path.dirname(os.path.abspath(__file__)) +if project_root not in sys.path: + sys.path.insert(0, project_root) -# 定义文件路径 -raster_path = "D:/工作/三普成果编制/出图数据/容县/栅格0925/AB/AB.tif" -vector_path = "D:/测试文件夹/容县耕园林草.shp" +from src import config +from src.utils.logger_setup import logger +from src.analysis.spatial_analyzer import SpatialAnalyzer +from src.reporting.report_generator import ReportGenerator -# 1. 使用 rasterio 读取栅格的坐标参考系统 (CRS) -with rasterio.open(raster_path) as src: - raster_crs = src.crs +def main(): + """ + 项目主入口函数,负责调度空间分析和报告生成流程。 + """ + logger.info("==================================================") + logger.info(" 地理处理与土壤属性分析项目启动 ") + logger.info("==================================================") -# 2. 使用 geopandas 读取矢量文件 -gdf = gpd.read_file(vector_path) + # 确保ArcPy环境可用 + try: + arcpy.GetInstallInfo() + logger.info(f"ArcPy环境已加载: {arcpy.GetInstallInfo()['ProductName']} {arcpy.GetInstallInfo()['Version']}") + except Exception as e: + logger.critical(f"ArcPy环境未正确配置或加载失败: {str(e)}") + logger.critical("请确保在ArcGIS Pro的Python环境中运行此脚本。") + sys.exit(1) -# 3. 检查并转换矢量数据的 CRS -print(f"原始矢量CRS: {gdf.crs}") -print(f"目标栅格CRS: {raster_crs}") + # 1. 执行空间分析 + logger.info("阶段一:开始执行空间分析...") + analyzer = SpatialAnalyzer(config) + analysis_success = analyzer.execute_analysis() -if gdf.crs != raster_crs: - print("CRS不匹配,正在转换矢量数据的CRS...") - # 使用 .to_crs() 方法进行转换 - gdf = gdf.to_crs(raster_crs) - print("转换完成。") + # 2. 如果分析成功,则生成报告 + if analysis_success: + logger.info("阶段二:空间分析完成,开始生成Excel报告...") + reporter = ReportGenerator(config) + report_success = reporter.generate_report() + if report_success: + logger.info("阶段二:Excel报告生成完毕!") + else: + logger.error("阶段二:Excel报告生成失败。") + else: + logger.error("阶段一:空间分析失败,请检查日志。") -# 4. 现在,将已经对齐了坐标系的 GeoDataFrame 传递给 exact_extract -# 注意:可以直接传递 GeoDataFrame 对象,而不仅仅是文件路径 -stats_to_calculate = ['mean', 'sum', 'count', 'min', 'max'] -results = exact_extract(raster_path, gdf, stats_to_calculate) + logger.info("==================================================") + logger.info(" 地理处理与土壤属性分析项目结束 ") + logger.info("==================================================") -# 5. 将结果合并回 GeoDataFrame -# exact_extract 在处理 GeoDataFrame 时,会保留原始的行顺序 -for stat in stats_to_calculate: - # 从结果列表中提取每个要素的'properties'字典中的统计值 - gdf[stat] = [res['properties'][stat] for res in results] - -# 打印最终带有统计结果的 GeoDataFrame -print("\n分区统计结果:") -print(gdf.head()) - -gdf.to_file("ddddl.shp", driver='ESRI Shapefile', encoding='utf-8') \ No newline at end of file +if __name__ == "__main__": + main() diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..ee0d6d3 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,3 @@ +arcpy +pandas +openpyxl diff --git a/src/__init__.py b/src/__init__.py new file mode 100644 index 0000000..0bf814f --- /dev/null +++ b/src/__init__.py @@ -0,0 +1 @@ +# src/__init__.py diff --git a/src/analysis/__init__.py b/src/analysis/__init__.py new file mode 100644 index 0000000..deb09fc --- /dev/null +++ b/src/analysis/__init__.py @@ -0,0 +1 @@ +# src/analysis/__init__.py diff --git a/src/analysis/data_manager.py b/src/analysis/data_manager.py new file mode 100644 index 0000000..d85fc56 --- /dev/null +++ b/src/analysis/data_manager.py @@ -0,0 +1,142 @@ +# src/analysis/data_manager.py +import arcpy +import os +import pandas as pd +from typing import Literal +from src.utils.logger_setup import logger +from src import config + +class DataManager: + """ + 负责所有与数据源(GDB、文件夹)的交互,包括数据的读取、写入和管理。 + 它不包含业务逻辑,只负责数据的“拿”和“放”。 + """ + def __init__(self): + self.logger = logger + self.output_gdb = config.OUTPUT_GDB + self.logger.info(f"DataManager 初始化,输出GDB: {self.output_gdb}") + + def get_feature_classes(self, input_location: str, shape_type: Literal['Point', 'Polyline', 'Polygon'] = "Point") -> list: + """ + 从文件夹或GDB中获取所有指定类型的要素类。 + + 参数: + input_location: 输入路径(文件夹或GDB) + shape_type: 要获取的要素类型(默认为"Point") + + 返回: + list: 要素类路径列表 + """ + feature_class_list = [] + self.logger.info(f"正在从 '{input_location}' 获取 '{shape_type}' 类型的要素类...") + + try: + # 检查输入是文件夹还是GDB + if input_location.endswith('.gdb'): + # GDB工作空间 + arcpy.env.workspace = input_location + feature_classes = arcpy.ListFeatureClasses(feature_type=shape_type) # 直接按类型过滤 + + for fc in feature_classes: + fc_path = os.path.join(input_location, fc) + feature_class_list.append(fc_path) + + else: + # 文件夹工作空间(搜索shp文件) + arcpy.env.workspace = input_location + # 获取所有shp文件 + shapefiles = arcpy.ListFiles("*.shp") + + for shp in shapefiles: + shp_path = os.path.join(input_location, shp) + desc = arcpy.Describe(shp_path) + if desc.shapeType == shape_type: + feature_class_list.append(shp_path) + except arcpy.ExecuteError: + self.logger.error(f"ArcGIS工具执行错误: {arcpy.GetMessages(2)}") + except Exception as e: + self.logger.error(f"读取输入位置时出错: {str(e)}") + + self.logger.info(f"在 '{input_location}' 找到 {len(feature_class_list)} 个 '{shape_type}' 要素类。") + return feature_class_list + + def save_output_feature_class(self, in_memory_fc: str, output_name: str) -> str | None: + """ + 将内存中的要素类写入输出路径(GDB)。 + + 参数: + in_memory_fc: 要写入的内存要素类 + output_name: 输出名称 + + 返回: + str: 输出要素类的完整路径,如果失败则返回 None + """ + output_path = self.output_gdb + self.logger.info(f"正在将要素类 '{in_memory_fc}' 保存到 '{output_path}',名称为 '{output_name}'...") + + try: + # 检查输出路径是GDB + if not arcpy.Exists(output_path): + # 创建GDB + parent_dir = os.path.dirname(output_path) + gdb_name = os.path.basename(output_path).replace('.gdb', '') + arcpy.management.CreateFileGDB(parent_dir, gdb_name) + self.logger.info(f"创建GDB: {output_path}") + + output_fc = os.path.join(output_path, output_name) + + # 如果目标要素类已存在,则删除 + if arcpy.Exists(output_fc): + arcpy.management.Delete(output_fc) + self.logger.warning(f"已删除现有要素类: {output_fc}") + + arcpy.management.CopyFeatures(in_memory_fc, output_fc) + self.logger.info(f"结果写入GDB: {output_fc}") + + return output_fc + + except arcpy.ExecuteError: + self.logger.error(f"ArcGIS工具执行错误: {arcpy.GetMessages(2)}") + return None + except Exception as e: + self.logger.error(f"写入输出时出错: {str(e)}") + return None + + def gdb_table_to_dataframe(self, table_path: str) -> pd.DataFrame | None: + """ + 将GDB中的表格读取为Pandas DataFrame。 + + 参数: + table_path: GDB中表格的完整路径 + + 返回: + pd.DataFrame: 读取到的DataFrame,如果失败则返回 None + """ + self.logger.info(f"正在从 '{table_path}' 读取表格到 DataFrame...") + if not arcpy.Exists(table_path): + self.logger.error(f"表格不存在: {table_path}") + return None + + try: + # 使用arcpy.da.FeatureClassToNumPyArray 或 arcpy.da.TableToNumPyArray + # 这里假设是普通表,如果需要几何信息,可以使用FeatureClassToNumPyArray + # fields = [f.name for f in arcpy.ListFields(table_path)] + # data = arcpy.da.TableToNumPyArray(table_path, fields) + # df = pd.DataFrame(data) + + # 更直接的方法,使用arcpy.da.SearchCursor + data = [] + fields = [f.name for f in arcpy.ListFields(table_path) if f.type not in ('Geometry', 'OID')] # 排除几何和OID字段 + with arcpy.da.SearchCursor(table_path, fields) as cursor: + for row in cursor: + data.append(row) + + df = pd.DataFrame(data, columns=fields) + self.logger.info(f"成功从 '{table_path}' 读取 {len(df)} 行数据。") + return df + except arcpy.ExecuteError: + self.logger.error(f"ArcGIS工具执行错误: {arcpy.GetMessages(2)}") + return None + except Exception as e: + self.logger.error(f"读取GDB表格到DataFrame时出错: {str(e)}") + return None diff --git a/src/analysis/spatial_analyzer.py b/src/analysis/spatial_analyzer.py new file mode 100644 index 0000000..ac88ef5 --- /dev/null +++ b/src/analysis/spatial_analyzer.py @@ -0,0 +1,339 @@ +# src/analysis/spatial_analyzer.py +import arcpy +import os +from src.utils.logger_setup import logger +from src import config +from src.analysis.data_manager import DataManager + +class SpatialAnalyzer: + """ + 封装项目最核心的空间处理和统计分析逻辑。 + 这是业务逻辑的中心,负责样点属性增强、空间连接和汇总统计。 + """ + def __init__(self, config_module): + self.logger = logger + self.config = config_module + self.data_manager = DataManager() + self.logger.info("SpatialAnalyzer 初始化。") + + # 设置ArcPy环境 + arcpy.env.overwriteOutput = True + arcpy.env.workspace = "in_memory" # 默认工作空间设置为内存,方便临时数据管理 + + self.final_enriched_points_path = None # 存储最终增强样点的路径 + self.summary_table_path = None # 存储最终统计表的路径 + + def execute_analysis(self) -> bool: + """ + 作为执行空间分析的唯一入口,按顺序调用内部方法完成整个流程。 + """ + self.logger.info("开始执行空间分析流程...") + + try: + # 1. 预检 + if not self._run_pre_analysis_checks(): + self.logger.error("预分析检查失败,分析终止。") + return False + + # 2. 获取初始点要素 + initial_points = self._get_initial_point_features() + if not initial_points: + self.logger.error("未找到初始点要素,分析终止。") + return False + + # 3. 合并与去重 + merged_deduplicated_points = self._merge_and_deduplicate_points(initial_points) + if not merged_deduplicated_points: + self.logger.error("合并与去重失败,分析终止。") + return False + + # 4. 属性连接 (多边形属性) + enriched_points = self._enrich_attributes_by_spatial_join(merged_deduplicated_points) + if not enriched_points: + self.logger.error("空间连接失败,分析终止。") + return False + + # 5. 清理字段 (保留指定字段) + final_points_for_stats = self._cleanup_fields(enriched_points) + if not final_points_for_stats: + self.logger.error("字段清理失败,分析终止。") + return False + + # 6. 汇总统计 + summary_table = self._calculate_summary_statistics(final_points_for_stats) + if not summary_table: + self.logger.error("汇总统计失败,分析终止。") + return False + + # 7. 保存最终结果 + self._save_final_results(final_points_for_stats, summary_table) + + self.logger.info("空间分析流程成功完成。") + return True + except arcpy.ExecuteError: + self.logger.error(f"ArcGIS工具执行错误: {arcpy.GetMessages(2)}") + return False + except Exception as e: + self.logger.error(f"空间分析过程中发生未知错误: {str(e)}", exc_info=True) + return False + finally: + # 清理内存中的临时数据 + self._cleanup_in_memory() + + def _run_pre_analysis_checks(self) -> bool: + """ + 执行预分析检查,验证输入数据和字段是否存在。 + """ + self.logger.info("执行预分析检查...") + + # 1. 检查输入GDB是否存在 + if not arcpy.Exists(self.config.INPUT_DATA_GDB): + self.logger.error(f"输入数据GDB不存在: {self.config.INPUT_DATA_GDB}") + return False + if not arcpy.Exists(self.config.BASE_DATA_GDB): + self.logger.error(f"基础数据GDB不存在: {self.config.BASE_DATA_GDB}") + return False + + # 2. 检查用于空间连接的多边形图层及其字段是否存在 + for layer_name, field_names in self.config.JOIN_POLYGON_LAYERS.items(): + layer_path = os.path.join(self.config.BASE_DATA_GDB, layer_name) + if not arcpy.Exists(layer_path): + self.logger.error(f"空间连接图层不存在: {layer_path}") + return False + for field_name in field_names: + if not arcpy.ListFields(layer_path, field_name): + self.logger.error(f"空间连接图层 '{layer_name}' 中缺少字段: '{field_name}'") + return False + + self.logger.info("预分析检查通过。") + return True + + def _get_initial_point_features(self) -> list | None: + """ + 从配置的输入GDB中获取所有点要素类。 + """ + self.logger.info("正在获取初始点要素类...") + point_features = self.data_manager.get_feature_classes(self.config.INPUT_DATA_GDB, shape_type="Point") + if not point_features: + self.logger.warning(f"在 '{self.config.INPUT_DATA_GDB}' 中未找到任何点要素类。") + return None + return point_features + + def _merge_and_deduplicate_points(self, point_features: list) -> str | None: + """ + 合并所有点要素类并删除重复点。 + """ + self.logger.info("正在合并点要素...") + temp_merged_points = "in_memory/temp_merged_points" + try: + arcpy.management.Merge(point_features, temp_merged_points, field_match_mode="USE_FIRST_SCHEMA") + merge_count = int(arcpy.management.GetCount(temp_merged_points).getOutput(0)) + self.logger.info(f"合并后要素数量: {merge_count}") + + self.logger.info("正在删除重复点...") + arcpy.management.DeleteIdentical(temp_merged_points, "Shape", "1 Meters") + after_dedup_count = int(arcpy.management.GetCount(temp_merged_points).getOutput(0)) + self.logger.info(f"去重后要素数量: {after_dedup_count} (删除 {merge_count - after_dedup_count} 个重复点)") + + # 删除除了 Shape 和 OID 类字段外的所有字段 + try: + merged_points = "in_memory/merged_points" + arcpy.conversion.ExportFeatures(temp_merged_points, merged_points) + + del_fields = [f.name for f in arcpy.ListFields(merged_points) + if f.type not in ("Geometry", "OID") and f.name.upper() not in ("SHAPE", "OBJECTID", "FID")] + if del_fields: + arcpy.management.DeleteField(merged_points, del_fields) + + current_data = merged_points + except Exception as ex: + self.logger.error(f"创建仅含几何的临时要素类失败: {str(ex)}") + current_data = temp_merged_points + + return current_data + except arcpy.ExecuteError: + self.logger.error(f"合并或去重时ArcGIS工具执行错误: {arcpy.GetMessages(2)}") + return None + except Exception as e: + self.logger.error(f"合并或去重过程中发生错误: {str(e)}") + return None + + def _enrich_attributes_by_spatial_join(self, target_features: str) -> str | None: + """ + 通过空间连接为样点赋值多边形属性(行政区划、土地利用、土壤类型)。 + """ + self.logger.info("开始执行空间连接,为样点赋值多边形属性...") + current_data = target_features + + # 遍历配置中需要连接的多边形图层 + for layer_name, field_names in self.config.JOIN_POLYGON_LAYERS.items(): + join_feature_path = os.path.join(self.config.BASE_DATA_GDB, layer_name) + + if not arcpy.Exists(join_feature_path): + self.logger.warning(f"警告: 连接要素不存在,跳过: {join_feature_path}") + continue + + temp_output = f"in_memory/joined_{layer_name}" + self.logger.info(f"正在将 '{current_data}' 与 '{layer_name}' ({join_feature_path}) 进行空间连接...") + + try: + # 构建字段映射,只保留需要连接的字段 + field_mappings = arcpy.FieldMappings() + field_mappings.addTable(current_data) # 添加目标要素的字段 + + # 添加连接要素的字段 + join_fields_to_add = [] + for field_name in field_names: + field_map = arcpy.FieldMap() + field_map.addInputField(join_feature_path, field_name) + field_mappings.addFieldMap(field_map) + join_fields_to_add.append(field_name) # 记录要添加的字段名 + + arcpy.analysis.SpatialJoin( + target_features=current_data, + join_features=join_feature_path, + out_feature_class=temp_output, + join_operation="JOIN_ONE_TO_ONE", + join_type="KEEP_ALL", + match_option="CLOSEST", # 样点与多边形通常使用相交 + search_radius="500 Meters", + field_mapping=field_mappings # 使用字段映射来控制输出字段 + ) + current_data = temp_output + self.logger.info(f"与 '{layer_name}' 连接完成,连接字段: {', '.join(join_fields_to_add)}。") + except arcpy.ExecuteError: + self.logger.error(f"与 '{layer_name}' 空间连接时ArcGIS工具执行错误: {arcpy.GetMessages(2)}") + return None + except Exception as e: + self.logger.error(f"与 '{layer_name}' 空间连接过程中发生错误: {str(e)}") + return None + + self.logger.info("所有空间连接完成。") + return current_data + + def _cleanup_fields(self, feature_class: str) -> str | None: + """ + 只保留指定的土壤属性字段、分组字段以及必要的几何和OID字段。 + """ + self.logger.info("正在清理和筛选字段...") + output_fc = "in_memory/cleaned_fields_fc" + try: + # 复制要素类以进行字段操作 + arcpy.management.CopyFeatures(feature_class, output_fc) + + # 构建要保留的字段列表 + fields_to_keep = ["Geometry", "OID", "SHAPE", "OBJECTID", "FID","OBJECTID_1"] # 几何和OID字段 + fields_to_keep.extend(self.config.KEEP_SOIL_FIELDS) # 配置中指定的土壤属性字段 + fields_to_keep.extend(self.config.GROUP_BY_FIELDS) # 分组字段 + + # 获取当前要素类中的所有字段名 + all_fields = [f.name for f in arcpy.ListFields(output_fc)] + + # 确定需要删除的字段 + fields_to_delete = [] + for field in all_fields: + # 忽略系统字段和要保留的字段 + if field.upper() not in [f.upper() for f in fields_to_keep]: + fields_to_delete.append(field) + + if fields_to_delete: + arcpy.management.DeleteField(output_fc, fields_to_delete) + self.logger.info(f"已删除字段: {', '.join(fields_to_delete)}") + else: + self.logger.info("无需删除额外字段。") + + # 检查是否有配置中指定的字段在最终要素类中缺失 + missing_fields = [f for f in self.config.KEEP_SOIL_FIELDS + self.config.GROUP_BY_FIELDS if not arcpy.ListFields(output_fc, f)] + if missing_fields: + self.logger.warning(f"警告: 以下配置的字段在清理后缺失: {', '.join(missing_fields)}") + + return output_fc + except arcpy.ExecuteError: + self.logger.error(f"字段清理时ArcGIS工具执行错误: {arcpy.GetMessages(2)}") + return None + except Exception as e: + self.logger.error(f"字段清理过程中发生错误: {str(e)}") + return None + + def _calculate_summary_statistics(self, feature_class: str) -> str | None: + """ + 对样点数据按指定字段进行分组汇总统计。 + """ + self.logger.info("开始计算汇总统计...") + output_table = os.path.join("in_memory", self.config.SUMMARY_TABLE_NAME) + + # 准备统计字段列表 + statistics_fields = [] + exists_field = [f.name for f in arcpy.ListFields(feature_class)] + exists_field.remove("TRZD") + for field in self.config.STATISTICS_FIELDS: + if field not in exists_field: + continue + for stat_type in self.config.STATISTICS_TYPE: + statistics_fields.append([field, stat_type]) + + if not statistics_fields: + self.logger.warning("未配置任何统计字段或统计类型,跳过汇总统计。") + return None + + try: + # 检查分组字段是否存在 + for group_field in self.config.GROUP_BY_FIELDS: + if not arcpy.ListFields(feature_class, group_field): + self.logger.error(f"分组字段 '{group_field}' 不存在于要素类中,无法进行统计。") + return None + + arcpy.analysis.Statistics( + in_table=feature_class, + out_table=output_table, + statistics_fields=statistics_fields, + case_field=self.config.GROUP_BY_FIELDS + ) + self.logger.info(f"汇总统计完成,结果表: {output_table}") + return output_table + except arcpy.ExecuteError: + self.logger.error(f"汇总统计时ArcGIS工具执行错误: {arcpy.GetMessages(2)}") + return None + except Exception as e: + self.logger.error(f"汇总统计过程中发生错误: {str(e)}") + return None + + def _save_final_results(self, enriched_points_fc: str, summary_table: str): + """ + 保存最终增强后的样点要素类和统计结果表到输出GDB。 + """ + self.logger.info("正在保存最终分析结果...") + + # 保存增强后的样点 + self.final_enriched_points_path = self.data_manager.save_output_feature_class( + enriched_points_fc, self.config.OUTPUT_FC_NAME + ) + if self.final_enriched_points_path: + self.logger.info(f"增强样点已保存到: {self.final_enriched_points_path}") + else: + self.logger.error("增强样点保存失败。") + + # 保存统计结果表 + output_table_path = os.path.join(self.config.OUTPUT_GDB, self.config.SUMMARY_TABLE_NAME) + try: + if arcpy.Exists(output_table_path): + arcpy.management.Delete(output_table_path) + self.logger.warning(f"已删除现有统计表: {output_table_path}") + arcpy.management.CopyRows(summary_table, output_table_path) + self.summary_table_path = output_table_path + self.logger.info(f"统计结果表已保存到: {self.summary_table_path}") + except arcpy.ExecuteError: + self.logger.error(f"保存统计结果表时ArcGIS工具执行错误: {arcpy.GetMessages(2)}") + except Exception as e: + self.logger.error(f"保存统计结果表过程中发生错误: {str(e)}") + + def _cleanup_in_memory(self): + """ + 清理ArcPy内存工作空间中的临时数据。 + """ + self.logger.info("正在清理内存中的临时数据...") + try: + arcpy.management.Delete("in_memory") + self.logger.info("内存工作空间已清理。") + except Exception as e: + self.logger.warning(f"清理内存工作空间时出错: {str(e)}") diff --git a/src/config.py b/src/config.py new file mode 100644 index 0000000..2497ab4 --- /dev/null +++ b/src/config.py @@ -0,0 +1,49 @@ +# src/config.py +import os + +# 项目根目录 +PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) + +# 输入数据路径 +# 建议将实际数据GDB或文件夹放在 project_root/data/ 目录下 +INPUT_DATA_GDB = os.path.join(PROJECT_ROOT, "data", "土壤属性统计表格数据.gdb") # 示例:输入样点、行政区划、土地利用、土壤类型数据所在的GDB +BASE_DATA_GDB = os.path.join(PROJECT_ROOT, "data", "土壤属性统计表格数据.gdb") # 示例:基础数据,如行政区划、土地利用、土壤类型 + +# 输出数据路径 +OUTPUT_GDB = os.path.join(PROJECT_ROOT, "output", "analysis_result.gdb") +OUTPUT_REPORT_EXCEL = os.path.join(PROJECT_ROOT, "output", "soil_analysis_report.xlsx") +LOG_FILE = os.path.join(PROJECT_ROOT, "output", "process.log") + +# 空间分析相关配置 +OUTPUT_FC_NAME = "三普样点合并结果" # 空间连接后的样点要素类名称 +SUMMARY_TABLE_NAME = "土壤属性统计表" # 统计结果表名称 + +# 空间连接字段配置 +# 样点需要连接的多边形图层名称及其属性字段 +# 例如:{"行政区划图层名称": "行政区划名称字段", "土地利用图层名称": "土地利用类型字段", ...} +JOIN_POLYGON_LAYERS = { + "兴宁区乡镇行政边界": ["XZQMC"], # 行政区划图层名和对应的行政区名称字段列表 + "地类图斑": ["DLBM"], # 土地利用图层名和对应的土地利用类型字段列表 + "土壤类型图": ["YL","TS"] # 土壤类型图层名和对应的土壤类型字段列表 +} + +# 需要保留的土壤属性字段列表 (这些字段将从空间连接结果中筛选保留) +KEEP_SOIL_FIELDS = [ + "AB", "ACU", "AMN", "AMO", "AS1", "AZN", "CEC", "ECA", "EMG", "TESE", "TN", "TP", + "TK", "AFE", "AK", "AP", "OM", "FL", "SL", "NL", "PH", "GZCHD", "YXTCHD", "TRRZ", "TRZD", + "YL", "TS" +] + +# 统计分析字段配置 +# 需要进行统计的土壤属性字段 +STATISTICS_FIELDS = [ + "PH", "OM", "TN", "TP", "TK", "CEC", "AB", "ACU", "AMN", "AMO", "AS1", "AZN", "ECA", "EMG", "TESE", "AFE", "AK", "AP", "FL", "SL", "NL", "GZCHD", "YXTCHD", "TRRZ", "TRZD" +] +# 统计类型 (例如: "MEAN", "MAX", "MIN", "SUM", "COUNT", "STD", "VAR") +STATISTICS_TYPE = ["MEAN", "MAX", "MIN", "COUNT"] + +# 分组字段 (用于统计分析和报告生成) +GROUP_BY_FIELDS = ["XZQMC", "DLBM", "TS"] # 行政区名称, 土地利用类型, 土壤类型 + +# 日志配置 +LOGGING_LEVEL = "INFO" # DEBUG, INFO, WARNING, ERROR, CRITICAL diff --git a/src/reporting/__init__.py b/src/reporting/__init__.py new file mode 100644 index 0000000..54bd4ab --- /dev/null +++ b/src/reporting/__init__.py @@ -0,0 +1 @@ +# src/reporting/__init__.py diff --git a/src/reporting/report_generator.py b/src/reporting/report_generator.py new file mode 100644 index 0000000..42b02b7 --- /dev/null +++ b/src/reporting/report_generator.py @@ -0,0 +1,145 @@ +# src/reporting/report_generator.py +import pandas as pd +import os +from src.utils.logger_setup import logger +from src import config +from src.analysis.data_manager import DataManager +from openpyxl import Workbook +from openpyxl.utils.dataframe import dataframe_to_rows +from openpyxl.styles import Font, Border, Side, Alignment +from openpyxl.worksheet.dimensions import ColumnDimension, DimensionHolder +from openpyxl.utils import get_column_letter + +class ReportGenerator: + """ + 负责将分析结果(GDB中的表格)转化为格式精美的Excel报告。 + """ + def __init__(self, config_module): + self.logger = logger + self.config = config_module + self.data_manager = DataManager() + self.logger.info("ReportGenerator 初始化。") + + def generate_report(self) -> bool: + """ + 作为生成报告的唯一入口,按顺序调用内部方法完成报告生成。 + """ + self.logger.info("开始生成Excel报告...") + try: + # 1. 从GDB加载统计结果表 + df_stats = self._load_data_from_gdb() + if df_stats is None or df_stats.empty: + self.logger.warning("未获取到统计数据或数据为空,无法生成报告。") + return False + + # 2. 格式化数据 + formatted_df = self._format_data(df_stats) + + # 3. 写入Excel并美化样式 + self._write_to_excel(formatted_df) + + self.logger.info(f"Excel报告成功生成到: {self.config.OUTPUT_REPORT_EXCEL}") + return True + except Exception as e: + self.logger.error(f"生成Excel报告过程中发生错误: {str(e)}", exc_info=True) + return False + + def _load_data_from_gdb(self) -> pd.DataFrame | None: + """ + 从GDB加载统计结果表。 + """ + self.logger.info(f"正在从GDB加载统计结果表: {self.config.OUTPUT_GDB}/{self.config.SUMMARY_TABLE_NAME}") + table_path = os.path.join(self.config.OUTPUT_GDB, self.config.SUMMARY_TABLE_NAME) + df = self.data_manager.gdb_table_to_dataframe(table_path) + return df + + def _format_data(self, df: pd.DataFrame) -> pd.DataFrame: + """ + 清洗和格式化数据,例如重命名字段为更易读的中文名称、调整数据结构等。 + """ + self.logger.info("正在格式化统计数据...") + + # 示例:重命名字段 + # 统计字段的命名规则通常是 FIELD_STATTYPE (如 PH_MEAN) + # 我们可以创建一个映射来美化这些名称 + rename_map = {} + for field in self.config.STATISTICS_FIELDS: + for stat_type in self.config.STATISTICS_TYPE: + original_col = f"{field}_{stat_type}".upper() # arcpy Statistics工具生成的字段名是大写 + display_name = f"{field}_{stat_type.lower()}" # 转换为小写统计类型,例如 PH_mean + rename_map[original_col] = display_name + + # 添加分组字段到重命名映射,如果需要 + for group_field in self.config.GROUP_BY_FIELDS: + if group_field not in rename_map: # 避免重复添加 + rename_map[group_field] = group_field # 暂时不改分组字段名,如果需要可以自定义 + + # 应用重命名 + df = df.rename(columns=rename_map) + + # 调整列顺序 (可选) + # desired_order = self.config.GROUP_BY_FIELDS + [rename_map.get(f"{f}_{st}".upper(), f"{f}_{st}".lower()) + # for f in self.config.STATISTICS_FIELDS + # for st in self.config.STATISTICS_TYPE] + # existing_cols = [col for col in desired_order if col in df.columns] + # df = df[existing_cols] + + self.logger.info("数据格式化完成。") + return df + + def _write_to_excel(self, df: pd.DataFrame): + """ + 将格式化后的数据写入Excel,并美化样式。 + """ + self.logger.info(f"正在将数据写入Excel文件: {self.config.OUTPUT_REPORT_EXCEL}") + + # 确保输出目录存在 + output_dir = os.path.dirname(self.config.OUTPUT_REPORT_EXCEL) + if not os.path.exists(output_dir): + os.makedirs(output_dir) + + # 创建一个新的Excel工作簿 + wb = Workbook() + ws = wb.active + if ws is None: + self.logger.error("无法获取活动工作表,报告生成失败。") + return + + ws.title = self.config.SUMMARY_TABLE_NAME # 设置工作表名称 + + # 将DataFrame数据写入工作表 + for r_idx, row in enumerate(dataframe_to_rows(df, index=False, header=True), 1): + ws.append(row) + if r_idx == 1: # 设置表头样式 + for cell in ws[r_idx]: + cell.font = Font(bold=True) + cell.alignment = Alignment(horizontal="center", vertical="center") + cell.border = Border(left=Side(style='thin'), right=Side(style='thin'), + top=Side(style='thin'), bottom=Side(style='thin')) + else: # 设置数据行样式 + for cell in ws[r_idx]: + cell.alignment = Alignment(horizontal="center", vertical="center") + cell.border = Border(left=Side(style='thin'), right=Side(style='thin'), + top=Side(style='thin'), bottom=Side(style='thin')) + + # 自动调整列宽 + dim_holder = DimensionHolder(worksheet=ws) # 修正参数名 + for col in range(ws.min_column, ws.max_column + 1): + max_length = 0 + column = get_column_letter(col) + for cell in ws[column]: + try: + if cell.value is not None and len(str(cell.value)) > max_length: # 检查cell.value是否为None + max_length = len(str(cell.value)) + except: + pass + adjusted_width = (max_length + 2) * 1.2 # 增加一些边距 + dim_holder[column] = ColumnDimension(ws, min=column, max=column, width=adjusted_width) + ws.column_dimensions = dim_holder + + # 冻结首行 + ws.freeze_panes = ws['A2'] + + # 保存Excel文件 + wb.save(self.config.OUTPUT_REPORT_EXCEL) + self.logger.info("Excel文件保存成功。") diff --git a/src/utils/logger_setup.py b/src/utils/logger_setup.py new file mode 100644 index 0000000..3192960 --- /dev/null +++ b/src/utils/logger_setup.py @@ -0,0 +1,36 @@ +# src/utils/logger_setup.py +import logging +import os +from src import config + +def setup_logging(): + """ + 配置项目的日志系统。 + 日志将输出到控制台和文件。 + """ + log_file = config.LOG_FILE + log_level_str = config.LOGGING_LEVEL.upper() + + # 确保日志文件目录存在 + log_dir = os.path.dirname(log_file) + if not os.path.exists(log_dir): + os.makedirs(log_dir) + + # 获取日志级别 + log_level = getattr(logging, log_level_str, logging.INFO) + + # 配置根日志记录器 + logging.basicConfig( + level=log_level, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + handlers=[ + logging.FileHandler(log_file, encoding='utf-8'), + logging.StreamHandler() # 同时输出到控制台 + ] + ) + + # 返回一个特定的logger实例,而不是根logger,以便于模块化使用 + return logging.getLogger(__name__) + +# 在模块加载时设置日志 +logger = setup_logging()