This commit is contained in:
2025-11-10 23:37:05 +08:00
parent b5c3b17a1d
commit d150e99666
7 changed files with 444 additions and 315 deletions

35
main.py
View File

@@ -12,7 +12,7 @@ if project_root not in sys.path:
from src import config
from src.utils.logger_setup import logger
from src.analysis.spatial_analyzer import SpatialAnalyzer
from src.reporting.report_generator import ReportGenerator
from src.analysis.raster_analyzer import RasterAnalyzer # 导入 RasterAnalyzer
def main():
"""
@@ -32,21 +32,28 @@ def main():
sys.exit(1)
# 1. 执行空间分析
logger.info("阶段一:开始执行空间分析...")
analyzer = SpatialAnalyzer(config)
analysis_success = analyzer.execute_analysis()
# logger.info("阶段一:开始执行空间分析...")
# analyzer = SpatialAnalyzer(config)
# analysis_success = analyzer.execute_analysis()
# 2. 如果分析成功,则生成报告
if analysis_success:
logger.info("阶段二空间分析完成开始生成Excel报告...")
reporter = ReportGenerator(config)
report_success = reporter.generate_report()
if report_success:
logger.info("阶段二Excel报告生成完毕")
else:
logger.error("阶段二Excel报告生成失败。")
# 2. 执行栅格分析(求交、制表、合并)
logger.info("阶段二:开始执行栅格分析...")
raster_analyzer = RasterAnalyzer(config)
raster_analysis_success = raster_analyzer.execute_analysis()
# 3. 输出分析结果
# if analysis_success and raster_analysis_success:
# logger.info("所有分析流程成功完成!")
# elif analysis_success and not raster_analysis_success:
# logger.error("空间分析成功,但栅格分析失败,请检查日志。")
# elif not analysis_success and raster_analysis_success:
# logger.error("空间分析失败,但栅格分析成功,请检查日志。")
# else:
# logger.error("所有分析流程失败,请检查日志。")
if raster_analysis_success:
logger.info("栅格分析流程成功完成!")
else:
logger.error("阶段一:空间分析失败,请检查日志。")
logger.error("栅格分析失败,请检查日志。")
logger.info("==================================================")
logger.info(" 地理处理与土壤属性分析项目结束 ")

View File

@@ -2,7 +2,9 @@
import arcpy
import os
import pandas as pd
import pyarrow as pa
from typing import Literal
from pathlib import Path # 导入 Path
from src.utils.logger_setup import logger
from src import config
@@ -16,13 +18,16 @@ class DataManager:
self.output_gdb = config.OUTPUT_GDB
self.logger.info(f"DataManager 初始化输出GDB: {self.output_gdb}")
def get_feature_classes(self, input_location: str, shape_type: Literal['Point', 'Polyline', 'Polygon'] = "Point") -> list:
def get_feature_classes(self, input_location: str, shape_type: Literal['Point', 'Polyline', 'Polygon'] = "Point",
feature_names_to_match: list | None = None) -> list:
"""
从文件夹或GDB中获取所有指定类型的要素类。
如果提供了 feature_names_to_match则只返回名称匹配的要素类。
参数:
input_location: 输入路径文件夹或GDB
shape_type: 要获取的要素类型(默认为"Point"
feature_names_to_match: 可选,要匹配的要素类名称列表。
返回:
list: 要素类路径列表
@@ -58,6 +63,18 @@ class DataManager:
self.logger.error(f"读取输入位置时出错: {str(e)}")
self.logger.info(f"'{input_location}' 找到 {len(feature_class_list)}'{shape_type}' 要素类。")
# 根据 feature_names_to_match 进行筛选
if feature_names_to_match:
filtered_feature_class_list = []
configured_names_set = {name.upper() for name in feature_names_to_match} # 转换为大写进行不区分大小写匹配
for fc_path in feature_class_list:
fc_name = Path(fc_path).stem.upper() # 获取要素类名称并转换为大写
if fc_name in configured_names_set:
filtered_feature_class_list.append(fc_path)
self.logger.info(f"根据配置筛选后,保留 {len(filtered_feature_class_list)} 个要素类。")
return filtered_feature_class_list
return feature_class_list
def save_output_feature_class(self, in_memory_fc: str, output_name: str) -> str | None:
@@ -140,3 +157,97 @@ class DataManager:
except Exception as e:
self.logger.error(f"读取GDB表格到DataFrame时出错: {str(e)}")
return None
def merge_tables_to_dataframe(self, table_paths: list) -> pd.DataFrame | None:
"""
将多个DBF表格读取为Pandas DataFrame并合并。
每个表格的 'gridcode' 列将被重命名为对应的土壤属性名称。
参数:
table_paths: DBF表格路径列表
返回:
pd.DataFrame: 合并后的DataFrame如果失败则返回 None
"""
self.logger.info("正在合并多个制表结果到 DataFrame...")
all_dfs = []
for table_path in table_paths:
df = self.gdb_table_to_dataframe(table_path)
if df is None:
self.logger.warning(f"无法读取表格: {table_path},跳过。")
continue
# 从文件名中提取土壤属性名称
soil_attribute_name = Path(table_path).stem.replace("tabulate_", "")
# 重命名 'gridcode' 列为土壤属性名称
if "gridcode" in df.columns:
df = df.rename(columns={"gridcode": soil_attribute_name})
else:
self.logger.warning(f"表格 '{table_path}' 中缺少 'gridcode' 字段,无法重命名。")
continue
# 重命名 'AREA' 列为 'AREA_{土壤属性名称}',以避免合并时的列名冲突
if "AREA" in df.columns:
df = df.rename(columns={"AREA": f"AREA_{soil_attribute_name}"})
else:
self.logger.warning(f"表格 '{table_path}' 中缺少 'AREA' 字段。")
# 只保留 XZQMC, DLBM, 土壤属性名称 和 AREA_{土壤属性名称} 字段
cols_to_keep = ["XZQMC", "DLBM", soil_attribute_name, f"AREA_{soil_attribute_name}"]
df = df[[col for col in cols_to_keep if col in df.columns]]
all_dfs.append(df)
if not all_dfs:
self.logger.error("没有可合并的 DataFrame。")
return None
# 第一次合并使用第一个DataFrame作为基础
merged_df = all_dfs[0]
for i in range(1, len(all_dfs)):
# 使用 XZQMC 和 DLBM 作为合并键
merged_df = pd.merge(merged_df, all_dfs[i], on=["XZQMC", "DLBM"], how="outer")
self.logger.info(f"成功合并 {len(all_dfs)} 个表格。")
return merged_df
def dataframe_to_gdb_table(self, dataframe: pd.DataFrame, output_table_name: str) -> str | None:
"""
将Pandas DataFrame写入到GDB中的新表。
参数:
dataframe: 要写入的Pandas DataFrame
output_table_name: 输出表的名称
返回:
str: 输出表的完整路径,如果失败则返回 None
"""
self.logger.info(f"正在将 DataFrame 写入到 GDB 表 '{output_table_name}'...")
output_table_path = os.path.join(self.output_gdb, output_table_name)
try:
# 检查输出GDB是否存在如果不存在则创建
if not arcpy.Exists(self.output_gdb):
parent_dir = os.path.dirname(self.output_gdb)
gdb_name = os.path.basename(self.output_gdb).replace('.gdb', '')
arcpy.management.CreateFileGDB(parent_dir, gdb_name)
self.logger.info(f"创建GDB: {self.output_gdb}")
# 如果目标表已存在,则删除
if arcpy.Exists(output_table_path):
arcpy.management.Delete(output_table_path)
self.logger.warning(f"已删除现有表: {output_table_path}")
# 将DataFrame转换为ArrowTable然后写入GDB表
arrow_table = pa.Table.from_pandas(dataframe)
arcpy.management.CopyRows(arrow_table, output_table_path)
self.logger.info(f"DataFrame 已成功写入到 GDB 表: {output_table_path}")
return output_table_path
except arcpy.ExecuteError:
self.logger.error(f"将 DataFrame 写入 GDB 表时ArcGIS工具执行错误: {arcpy.GetMessages(2)}")
return None
except Exception as e:
self.logger.error(f"将 DataFrame 写入 GDB 表过程中发生错误: {str(e)}")
return None

View File

@@ -0,0 +1,183 @@
# src/analysis/raster_analyzer.py
import arcpy
import os
import pandas as pd
from pathlib import Path
from src.utils.logger_setup import logger
from src.analysis.data_manager import DataManager
from src import config
class RasterAnalyzer:
"""
负责处理土壤属性栅格数据的分析流程,包括求交、制表和结果合并。
"""
def __init__(self, config_module):
self.logger = logger
self.config = config_module
self.data_manager = DataManager()
self.logger.info("RasterAnalyzer 初始化。")
arcpy.env.overwriteOutput = True
arcpy.env.workspace = "in_memory" # 默认工作空间设置为内存
self.dltb_polygon = self.config.DLTB_POLYGON_PATH
self.xzq_polygon = self.config.XZQ_POLYGON_PATH
self.reclassified_polygon_path = self.config.RECLASSIFIED_POLYGON_PATH
self.output_gdb = self.config.OUTPUT_GDB
self.final_summary_table_name = self.config.FINAL_SUMMARY_TABLE_NAME
# 检查必要输入要素是否存在
if not arcpy.Exists(self.dltb_polygon):
self.logger.error(f"地类图斑要素不存在: {self.dltb_polygon}")
raise FileNotFoundError(f"地类图斑要素不存在: {self.dltb_polygon}")
if not arcpy.Exists(self.xzq_polygon):
self.logger.error(f"行政区划要素不存在: {self.xzq_polygon}")
raise FileNotFoundError(f"行政区划要素不存在: {self.xzq_polygon}")
if not arcpy.Exists(self.reclassified_polygon_path):
self.logger.error(f"重分类面要素路径不存在: {self.reclassified_polygon_path}")
raise FileNotFoundError(f"重分类面要素路径不存在: {self.reclassified_polygon_path}")
def execute_analysis(self) -> bool:
"""
执行栅格分析的主流程。
"""
self.logger.info("开始执行栅格分析流程...")
temp_dbf_tables = [] # 存储所有临时制表结果的路径
try:
# 获取所有重分类后的面要素
reclassed_polygons = self.data_manager.get_feature_classes(
self.reclassified_polygon_path,
shape_type="Polygon"
)
if not reclassed_polygons:
self.logger.error(f"'{self.reclassified_polygon_path}' 中未找到任何重分类后的面要素。")
return False
for reclassed_polygon in reclassed_polygons:
polygon_name = Path(reclassed_polygon).stem
self.logger.info(f"正在处理土壤属性: {polygon_name}")
# 1. 求地类图斑和土壤属性重分类后面要素的交集
intersect_output = self._intersect_features(reclassed_polygon, polygon_name)
if not intersect_output:
self.logger.error(f"土壤属性 '{polygon_name}' 的交集分析失败。")
continue
# 2. 交集制表
dbf_table = self._tabulate_intersection(intersect_output, polygon_name)
if dbf_table:
temp_dbf_tables.append(dbf_table)
else:
self.logger.error(f"土壤属性 '{polygon_name}' 的制表失败。")
# 清理内存中的临时交集结果
arcpy.management.Delete(intersect_output)
if not temp_dbf_tables:
self.logger.error("没有生成任何制表结果,分析终止。")
return False
# 3. 合并所有制表结果并保存到GDB
self.logger.info("正在合并所有制表结果...")
final_dataframe = self.data_manager.merge_tables_to_dataframe(temp_dbf_tables)
if final_dataframe is None:
self.logger.error("合并制表结果失败。")
return False
output_table_path = self.data_manager.dataframe_to_gdb_table(
final_dataframe, self.final_summary_table_name
)
if output_table_path:
self.logger.info(f"最终汇总表已保存到: {output_table_path}")
else:
self.logger.error("最终汇总表保存失败。")
self.logger.info("栅格分析流程成功完成。")
return True
except arcpy.ExecuteError:
self.logger.error(f"ArcGIS工具执行错误: {arcpy.GetMessages(2)}")
return False
except Exception as e:
self.logger.error(f"栅格分析过程中发生未知错误: {str(e)}", exc_info=True)
return False
finally:
# 清理内存中的临时数据和临时DBF文件
self._cleanup_in_memory_and_temp_files(temp_dbf_tables)
def _intersect_features(self, reclassed_polygon: str, polygon_name: str) -> str | None:
"""
求地类图斑和重分类后面要素的交集。
"""
self.logger.info(f"开始求 '{polygon_name}' 与地类图斑的交集...")
out_feature_class = f"in_memory/intersect_{polygon_name}"
try:
in_features = [self.dltb_polygon, reclassed_polygon]
arcpy.analysis.Intersect(
in_features=in_features,
out_feature_class=out_feature_class,
join_attributes="ALL",
output_type="INPUT"
)
self.logger.info(f"'{polygon_name}' 交集结果已生成: {out_feature_class}")
return out_feature_class
except arcpy.ExecuteError:
self.logger.error(f"'{polygon_name}' 交集时ArcGIS工具执行错误: {arcpy.GetMessages(2)}")
return None
except Exception as e:
self.logger.error(f"'{polygon_name}' 交集过程中发生错误: {str(e)}")
return None
def _tabulate_intersection(self, intersect_feature_class: str, polygon_name: str) -> str | None:
"""
对交集结果进行制表。
"""
self.logger.info(f"开始对 '{polygon_name}' 的交集结果进行制表...")
# 制表结果将保存在内存中
out_table = f"in_memory/tabulate_{polygon_name}"
try:
# 确保 XZQMC 字段存在于行政区划要素中
if not arcpy.ListFields(self.xzq_polygon, "XZQMC"):
self.logger.error(f"行政区划要素 '{self.xzq_polygon}' 中缺少 'XZQMC' 字段。")
return None
# 确保 'gridcode' 和 'DLBM' 字段存在于交集要素中
intersect_fields = [f.name for f in arcpy.ListFields(intersect_feature_class)]
if "gridcode" not in intersect_fields:
self.logger.error(f"交集要素 '{intersect_feature_class}' 中缺少 'gridcode' 字段。")
return None
if "DLBM" not in intersect_fields:
self.logger.error(f"交集要素 '{intersect_feature_class}' 中缺少 'DLBM' 字段。")
return None
arcpy.analysis.TabulateIntersection(
in_zone_features=self.xzq_polygon, # 乡镇边界
zone_fields="XZQMC",
in_class_features=intersect_feature_class,
out_table=out_table,
class_fields="gridcode;DLBM",
out_units="SQUARE_METERS"
)
self.logger.info(f"'{polygon_name}' 制表结果已生成: {out_table}")
return out_table
except arcpy.ExecuteError:
self.logger.error(f"'{polygon_name}' 制表时ArcGIS工具执行错误: {arcpy.GetMessages(2)}")
return None
except Exception as e:
self.logger.error(f"'{polygon_name}' 制表过程中发生错误: {str(e)}")
return None
def _cleanup_in_memory_and_temp_files(self, temp_tables: list):
"""
清理ArcPy内存工作空间中的临时数据。
"""
self.logger.info("正在清理内存中的临时数据...")
try:
# 内存工作空间中的所有临时数据都会被删除
arcpy.management.Delete("in_memory")
self.logger.info("内存工作空间已清理。")
except Exception as e:
self.logger.warning(f"清理临时数据时出错: {str(e)}")

View File

@@ -1,14 +1,15 @@
# src/analysis/spatial_analyzer.py
import itertools
import arcpy
import os
from pathlib import Path
from src.utils.logger_setup import logger
from src import config
from src.analysis.data_manager import DataManager
class SpatialAnalyzer:
"""
封装项目最核心的空间处理和统计分析逻辑。
这是业务逻辑的中心,负责样点属性增强、空间连接和汇总统计
这是业务逻辑的中心,负责样点属性增强、空间连接。
"""
def __init__(self, config_module):
self.logger = logger
@@ -21,7 +22,6 @@ class SpatialAnalyzer:
arcpy.env.workspace = "in_memory" # 默认工作空间设置为内存,方便临时数据管理
self.final_enriched_points_path = None # 存储最终增强样点的路径
self.summary_table_path = None # 存储最终统计表的路径
def execute_analysis(self) -> bool:
"""
@@ -47,26 +47,26 @@ class SpatialAnalyzer:
self.logger.error("合并与去重失败,分析终止。")
return False
# 4. 属性连接 (多边形属性)
enriched_points = self._enrich_attributes_by_spatial_join(merged_deduplicated_points)
# 4. 与土壤属性点进行空间连接
enriched_with_soil_points = self._enrich_with_soil_attributes(merged_deduplicated_points)
if not enriched_with_soil_points:
self.logger.error("与土壤属性点空间连接失败,分析终止。")
return False
# 5. 属性连接 (多边形属性)
enriched_points = self._enrich_attributes_by_spatial_join(enriched_with_soil_points)
if not enriched_points:
self.logger.error("空间连接失败,分析终止。")
return False
# 5. 清理字段 (保留指定字段)
final_points_for_stats = self._cleanup_fields(enriched_points)
if not final_points_for_stats:
# 6. 清理字段 (保留指定字段)
final_points = self._cleanup_fields(enriched_points)
if not final_points:
self.logger.error("字段清理失败,分析终止。")
return False
# 6. 汇总统计
summary_table = self._calculate_summary_statistics(final_points_for_stats)
if not summary_table:
self.logger.error("汇总统计失败,分析终止。")
return False
# 7. 保存最终结果
self._save_final_results(final_points_for_stats, summary_table)
self._save_final_results(final_points)
self.logger.info("空间分析流程成功完成。")
return True
@@ -87,15 +87,15 @@ class SpatialAnalyzer:
self.logger.info("执行预分析检查...")
# 1. 检查输入GDB是否存在
if not arcpy.Exists(self.config.INPUT_DATA_GDB):
self.logger.error(f"输入数据GDB不存在: {self.config.INPUT_DATA_GDB}")
if not arcpy.Exists(self.config.INPUT_DATA_PATH):
self.logger.error(f"输入数据GDB不存在: {self.config.INPUT_DATA_PATH}")
return False
if not arcpy.Exists(self.config.BASE_DATA_GDB):
self.logger.error(f"基础数据GDB不存在: {self.config.BASE_DATA_GDB}")
return False
# 2. 检查用于空间连接的多边形图层及其字段是否存在
for layer_name, field_names in self.config.JOIN_POLYGON_LAYERS.items():
for layer_name, field_names in self.config.JOIN_POLYGON_FEATURES.items():
layer_path = os.path.join(self.config.BASE_DATA_GDB, layer_name)
if not arcpy.Exists(layer_path):
self.logger.error(f"空间连接图层不存在: {layer_path}")
@@ -113,9 +113,13 @@ class SpatialAnalyzer:
从配置的输入GDB中获取所有点要素类。
"""
self.logger.info("正在获取初始点要素类...")
point_features = self.data_manager.get_feature_classes(self.config.INPUT_DATA_GDB, shape_type="Point")
point_features = self.data_manager.get_feature_classes(
self.config.INPUT_DATA_PATH,
shape_type="Point",
feature_names_to_match=list(self.config.JOIN_POINT_FEATURES.keys()) # 传入需要匹配的要素类名称
)
if not point_features:
self.logger.warning(f"'{self.config.INPUT_DATA_GDB}' 中未找到任何点要素类。")
self.logger.warning(f"'{self.config.INPUT_DATA_PATH}' 中未找到任何配置的点要素类。")
return None
return point_features
@@ -158,6 +162,51 @@ class SpatialAnalyzer:
self.logger.error(f"合并或去重过程中发生错误: {str(e)}")
return None
def _enrich_with_soil_attributes(self, target_features: str) -> str | None:
"""
与土壤属性点进行空间连接,为样点赋值土壤属性。
"""
self.logger.info("开始与土壤属性点进行空间连接...")
current_data = target_features
# 获取所有土壤属性点要素类
point_features = self.data_manager.get_feature_classes(self.config.INPUT_DATA_PATH, shape_type="Point")
if not point_features:
self.logger.warning("未找到任何土壤属性点要素类")
return target_features
# 遍历每个土壤属性点要素类进行空间连接
for soil_point_path in point_features:
# 获取要素类名称(用于临时输出名称)
soil_point_fc = Path(soil_point_path).stem
if not arcpy.Exists(soil_point_path) or soil_point_fc not in self.config.JOIN_POINT_FEATURES:
self.logger.warning(f"土壤属性点要素类不存在,跳过: {soil_point_path}")
continue
try:
temp_output = f"in_memory/joined_soil_{soil_point_fc}"
arcpy.analysis.SpatialJoin(
target_features=current_data,
join_features=soil_point_path,
out_feature_class=temp_output,
join_operation="JOIN_ONE_TO_ONE",
join_type="KEEP_ALL",
match_option="CLOSEST",
search_radius="1 Meters", # 使用较小的搜索半径确保精确匹配
)
current_data = temp_output
except arcpy.ExecuteError:
self.logger.error(f"与土壤属性点 '{soil_point_fc}' 空间连接时ArcGIS工具执行错误: {arcpy.GetMessages(2)}")
return None
except Exception as e:
self.logger.error(f"与土壤属性点 '{soil_point_fc}' 空间连接过程中发生错误: {str(e)}")
return None
self.logger.info("所有土壤属性点空间连接完成。")
return current_data
def _enrich_attributes_by_spatial_join(self, target_features: str) -> str | None:
"""
通过空间连接为样点赋值多边形属性(行政区划、土地利用、土壤类型)。
@@ -165,47 +214,37 @@ class SpatialAnalyzer:
self.logger.info("开始执行空间连接,为样点赋值多边形属性...")
current_data = target_features
# 获取面要素类并进行空间连接
polygon_features = self.data_manager.get_feature_classes(self.config.BASE_DATA_GDB, shape_type="Polygon")
if not polygon_features:
self.logger.warning("未找到任何多边形要素类,跳过空间连接步骤。")
return target_features
# 遍历配置中需要连接的多边形图层
for layer_name, field_names in self.config.JOIN_POLYGON_LAYERS.items():
join_feature_path = os.path.join(self.config.BASE_DATA_GDB, layer_name)
for join_polygon in polygon_features:
polygon_name = Path(join_polygon).stem
if not arcpy.Exists(join_feature_path):
self.logger.warning(f"警告: 连接要素不存在,跳过: {join_feature_path}")
if not arcpy.Exists(join_polygon) or polygon_name not in self.config.JOIN_POLYGON_FEATURES:
self.logger.warning(f"警告: 连接要素不存在,跳过: {join_polygon}")
continue
temp_output = f"in_memory/joined_{layer_name}"
self.logger.info(f"正在将 '{current_data}''{layer_name}' ({join_feature_path}) 进行空间连接...")
try:
# 构建字段映射,只保留需要连接的字段
field_mappings = arcpy.FieldMappings()
field_mappings.addTable(current_data) # 添加目标要素的字段
# 添加连接要素的字段
join_fields_to_add = []
for field_name in field_names:
field_map = arcpy.FieldMap()
field_map.addInputField(join_feature_path, field_name)
field_mappings.addFieldMap(field_map)
join_fields_to_add.append(field_name) # 记录要添加的字段名
temp_output = f"in_memory/joined_poly_{polygon_name}"
arcpy.analysis.SpatialJoin(
target_features=current_data,
join_features=join_feature_path,
join_features=join_polygon,
out_feature_class=temp_output,
join_operation="JOIN_ONE_TO_ONE",
join_type="KEEP_ALL",
match_option="CLOSEST", # 样点与多边形通常使用相交
search_radius="500 Meters",
field_mapping=field_mappings # 使用字段映射来控制输出字段
match_option="CLOSEST",
search_radius="500 Meters"
)
current_data = temp_output
self.logger.info(f"'{layer_name}' 连接完成,连接字段: {', '.join(join_fields_to_add)}")
except arcpy.ExecuteError:
self.logger.error(f"'{layer_name}' 空间连接时ArcGIS工具执行错误: {arcpy.GetMessages(2)}")
self.logger.error(f"'{polygon_name}' 空间连接时ArcGIS工具执行错误: {arcpy.GetMessages(2)}")
return None
except Exception as e:
self.logger.error(f"'{layer_name}' 空间连接过程中发生错误: {str(e)}")
self.logger.error(f"'{polygon_name}' 空间连接过程中发生错误: {str(e)}")
return None
self.logger.info("所有空间连接完成。")
@@ -222,30 +261,31 @@ class SpatialAnalyzer:
arcpy.management.CopyFeatures(feature_class, output_fc)
# 构建要保留的字段列表
fields_to_keep = ["Geometry", "OID", "SHAPE", "OBJECTID", "FID","OBJECTID_1"] # 几何和OID字段
fields_to_keep.extend(self.config.KEEP_SOIL_FIELDS) # 配置中指定的土壤属性字段
fields_to_keep.extend(self.config.GROUP_BY_FIELDS) # 分组字段
# 获取当前要素类中的所有字段名
all_fields = [f.name for f in arcpy.ListFields(output_fc)]
fields_to_keep = [*list(self.config.JOIN_POINT_FEATURES.values()), *list(self.config.JOIN_POLYGON_FEATURES.values())] # 配置中指定的属性字段
fields_to_keep = [*set(itertools.chain.from_iterable(fields_to_keep))] # 扁平化列表
self.logger.info(f"配置中指定需要保留的字段: {fields_to_keep}")
# 确定需要删除的字段
fields_to_delete = []
for field in all_fields:
# 忽略系统字段和要保留的字段
if field.upper() not in [f.upper() for f in fields_to_keep]:
fields_to_delete.append(field)
delete_fields = []
keeped_fields = []
for field in arcpy.ListFields(output_fc):
# 忽略系统字段
if field.type in ("Geometry", "OID"):
continue
if field.name.upper() not in [f.upper() for f in fields_to_keep]:
delete_fields.append(field.name)
else:
keeped_fields.append(field.name)
if fields_to_delete:
arcpy.management.DeleteField(output_fc, fields_to_delete)
self.logger.info(f"已删除字段: {', '.join(fields_to_delete)}")
if delete_fields:
arcpy.management.DeleteField(output_fc, delete_fields)
else:
self.logger.info("无需删除额外字段。")
# 检查是否有配置中指定的字段在最终要素类中缺失
missing_fields = [f for f in self.config.KEEP_SOIL_FIELDS + self.config.GROUP_BY_FIELDS if not arcpy.ListFields(output_fc, f)]
missing_fields = set(fields_to_keep) - set(keeped_fields)
if missing_fields:
self.logger.warning(f"警告: 以下配置的字段在清理后缺失: {', '.join(missing_fields)}")
self.logger.warning(f"警告: 以下配置的字段缺失: {', '.join(missing_fields)}")
return output_fc
except arcpy.ExecuteError:
@@ -255,77 +295,21 @@ class SpatialAnalyzer:
self.logger.error(f"字段清理过程中发生错误: {str(e)}")
return None
def _calculate_summary_statistics(self, feature_class: str) -> str | None:
def _save_final_results(self, enriched_points_fc: str):
"""
对样点数据按指定字段进行分组汇总统计
"""
self.logger.info("开始计算汇总统计...")
output_table = os.path.join("in_memory", self.config.SUMMARY_TABLE_NAME)
# 准备统计字段列表
statistics_fields = []
exists_field = [f.name for f in arcpy.ListFields(feature_class)]
exists_field.remove("TRZD")
for field in self.config.STATISTICS_FIELDS:
if field not in exists_field:
continue
for stat_type in self.config.STATISTICS_TYPE:
statistics_fields.append([field, stat_type])
if not statistics_fields:
self.logger.warning("未配置任何统计字段或统计类型,跳过汇总统计。")
return None
try:
# 检查分组字段是否存在
for group_field in self.config.GROUP_BY_FIELDS:
if not arcpy.ListFields(feature_class, group_field):
self.logger.error(f"分组字段 '{group_field}' 不存在于要素类中,无法进行统计。")
return None
arcpy.analysis.Statistics(
in_table=feature_class,
out_table=output_table,
statistics_fields=statistics_fields,
case_field=self.config.GROUP_BY_FIELDS
)
self.logger.info(f"汇总统计完成,结果表: {output_table}")
return output_table
except arcpy.ExecuteError:
self.logger.error(f"汇总统计时ArcGIS工具执行错误: {arcpy.GetMessages(2)}")
return None
except Exception as e:
self.logger.error(f"汇总统计过程中发生错误: {str(e)}")
return None
def _save_final_results(self, enriched_points_fc: str, summary_table: str):
"""
保存最终增强后的样点要素类和统计结果表到输出GDB。
保存最终样点要素类到输出GDB
"""
self.logger.info("正在保存最终分析结果...")
# 保存增强后的样点
# 保存最终样点
self.final_enriched_points_path = self.data_manager.save_output_feature_class(
enriched_points_fc, self.config.OUTPUT_FC_NAME
)
if self.final_enriched_points_path:
self.logger.info(f"增强样点已保存到: {self.final_enriched_points_path}")
self.logger.info(f"最终样点已保存到: {self.final_enriched_points_path}")
else:
self.logger.error("增强样点保存失败。")
# 保存统计结果表
output_table_path = os.path.join(self.config.OUTPUT_GDB, self.config.SUMMARY_TABLE_NAME)
try:
if arcpy.Exists(output_table_path):
arcpy.management.Delete(output_table_path)
self.logger.warning(f"已删除现有统计表: {output_table_path}")
arcpy.management.CopyRows(summary_table, output_table_path)
self.summary_table_path = output_table_path
self.logger.info(f"统计结果表已保存到: {self.summary_table_path}")
except arcpy.ExecuteError:
self.logger.error(f"保存统计结果表时ArcGIS工具执行错误: {arcpy.GetMessages(2)}")
except Exception as e:
self.logger.error(f"保存统计结果表过程中发生错误: {str(e)}")
self.logger.error("最终样点保存失败。")
def _cleanup_in_memory(self):
"""

View File

@@ -5,45 +5,35 @@ import os
PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
# 输入数据路径
# 建议将实际数据GDB或文件夹放在 project_root/data/ 目录下
INPUT_DATA_GDB = os.path.join(PROJECT_ROOT, "data", "土壤属性统计表格数据.gdb") # 示例:输入样点、行政区划、土地利用、土壤类型数据所在的GDB
BASE_DATA_GDB = os.path.join(PROJECT_ROOT, "data", "土壤属性统计表格数据.gdb") # 示例:基础数据,如行政区划、土地利用、土壤类型
INPUT_DATA_PATH = r"D:\工作\三普成果编制\出图数据\广西兴宁区\土壤属性图数据\土壤属性统计表格数据.gdb" # 示例输入样点、行政区划、土地利用、土壤类型数据所在的GDB
BASE_DATA_GDB = r"D:\工作\三普成果编制\出图数据\广西兴宁区\土壤属性图数据\土壤属性统计表格数据.gdb" # 示例:基础数据,如行政区划、土地利用、土壤类型
# 输出数据路径
OUTPUT_GDB = os.path.join(PROJECT_ROOT, "output", "analysis_result.gdb")
OUTPUT_REPORT_EXCEL = os.path.join(PROJECT_ROOT, "output", "soil_analysis_report.xlsx")
LOG_FILE = os.path.join(PROJECT_ROOT, "output", "process.log")
# 空间分析相关配置
OUTPUT_FC_NAME = "三普样点合并结果" # 空间连接后的样点要素类名称
SUMMARY_TABLE_NAME = "土壤属性统计" # 统计结果表名称
FINAL_SUMMARY_TABLE_NAME = "土壤属性分级汇总" # 最终输出的合并表的名称
# 栅格分析相关配置
RECLASSIFIED_POLYGON_PATH = r"D:\工作\三普成果编制\出图数据\广西兴宁区\土壤属性图数据\面积统计用栅格面\新建文件夹" # 示例重分类后的面要素所在的GDB或文件夹路径
DLTB_POLYGON_PATH = r"D:\工作\三普成果编制\出图数据\广西兴宁区\土壤属性图数据\土壤属性统计表格数据.gdb\地类图斑" # 示例:地类图斑要素的完整路径
XZQ_POLYGON_PATH = r"D:\工作\三普成果编制\出图数据\广西兴宁区\土壤属性图数据\土壤属性统计表格数据.gdb\行政区划" # 示例:行政区划要素的完整路径
# 空间连接字段配置
# 样点需要连接的多边形图层名称及其属性字段
# 例如:{"行政区划图层名称": "行政区划名称字段", "土地利用图层名称": "土地利用类型字段", ...}
JOIN_POLYGON_LAYERS = {
"兴宁区乡镇行政边界": ["XZQMC"], # 行政区划图层名和对应的行政区名称字段列表
"地类图斑": ["DLBM"], # 土地利用图层名和对应的土地利用类型字段列表
"土壤类型图": ["YL","TS"] # 土壤类型图层名和对应的土壤类型字段列表
# 点要素及保留字段
JOIN_POINT_FEATURES = {
"PH": ["PH"],
"OM": ["OM"],
"AK": ["AK"]
}
# 面要素及保留字段
JOIN_POLYGON_FEATURES = {
"兴宁区乡镇行政边界": ["XZQMC"],
"地类图斑": ["DLBM"],
"土壤类型图": ["YL","TS","TZ"]
}
# 需要保留的土壤属性字段列表 (这些字段将从空间连接结果中筛选保留)
KEEP_SOIL_FIELDS = [
"AB", "ACU", "AMN", "AMO", "AS1", "AZN", "CEC", "ECA", "EMG", "TESE", "TN", "TP",
"TK", "AFE", "AK", "AP", "OM", "FL", "SL", "NL", "PH", "GZCHD", "YXTCHD", "TRRZ", "TRZD",
"YL", "TS"
]
# 统计分析字段配置
# 需要进行统计的土壤属性字段
STATISTICS_FIELDS = [
"PH", "OM", "TN", "TP", "TK", "CEC", "AB", "ACU", "AMN", "AMO", "AS1", "AZN", "ECA", "EMG", "TESE", "AFE", "AK", "AP", "FL", "SL", "NL", "GZCHD", "YXTCHD", "TRRZ", "TRZD"
]
# 统计类型 (例如: "MEAN", "MAX", "MIN", "SUM", "COUNT", "STD", "VAR")
STATISTICS_TYPE = ["MEAN", "MAX", "MIN", "COUNT"]
# 分组字段 (用于统计分析和报告生成)
GROUP_BY_FIELDS = ["XZQMC", "DLBM", "TS"] # 行政区名称, 土地利用类型, 土壤类型
# 日志配置
LOGGING_LEVEL = "INFO" # DEBUG, INFO, WARNING, ERROR, CRITICAL

View File

@@ -1 +0,0 @@
# src/reporting/__init__.py

View File

@@ -1,145 +0,0 @@
# src/reporting/report_generator.py
import pandas as pd
import os
from src.utils.logger_setup import logger
from src import config
from src.analysis.data_manager import DataManager
from openpyxl import Workbook
from openpyxl.utils.dataframe import dataframe_to_rows
from openpyxl.styles import Font, Border, Side, Alignment
from openpyxl.worksheet.dimensions import ColumnDimension, DimensionHolder
from openpyxl.utils import get_column_letter
class ReportGenerator:
"""
负责将分析结果GDB中的表格转化为格式精美的Excel报告。
"""
def __init__(self, config_module):
self.logger = logger
self.config = config_module
self.data_manager = DataManager()
self.logger.info("ReportGenerator 初始化。")
def generate_report(self) -> bool:
"""
作为生成报告的唯一入口,按顺序调用内部方法完成报告生成。
"""
self.logger.info("开始生成Excel报告...")
try:
# 1. 从GDB加载统计结果表
df_stats = self._load_data_from_gdb()
if df_stats is None or df_stats.empty:
self.logger.warning("未获取到统计数据或数据为空,无法生成报告。")
return False
# 2. 格式化数据
formatted_df = self._format_data(df_stats)
# 3. 写入Excel并美化样式
self._write_to_excel(formatted_df)
self.logger.info(f"Excel报告成功生成到: {self.config.OUTPUT_REPORT_EXCEL}")
return True
except Exception as e:
self.logger.error(f"生成Excel报告过程中发生错误: {str(e)}", exc_info=True)
return False
def _load_data_from_gdb(self) -> pd.DataFrame | None:
"""
从GDB加载统计结果表。
"""
self.logger.info(f"正在从GDB加载统计结果表: {self.config.OUTPUT_GDB}/{self.config.SUMMARY_TABLE_NAME}")
table_path = os.path.join(self.config.OUTPUT_GDB, self.config.SUMMARY_TABLE_NAME)
df = self.data_manager.gdb_table_to_dataframe(table_path)
return df
def _format_data(self, df: pd.DataFrame) -> pd.DataFrame:
"""
清洗和格式化数据,例如重命名字段为更易读的中文名称、调整数据结构等。
"""
self.logger.info("正在格式化统计数据...")
# 示例:重命名字段
# 统计字段的命名规则通常是 FIELD_STATTYPE (如 PH_MEAN)
# 我们可以创建一个映射来美化这些名称
rename_map = {}
for field in self.config.STATISTICS_FIELDS:
for stat_type in self.config.STATISTICS_TYPE:
original_col = f"{field}_{stat_type}".upper() # arcpy Statistics工具生成的字段名是大写
display_name = f"{field}_{stat_type.lower()}" # 转换为小写统计类型,例如 PH_mean
rename_map[original_col] = display_name
# 添加分组字段到重命名映射,如果需要
for group_field in self.config.GROUP_BY_FIELDS:
if group_field not in rename_map: # 避免重复添加
rename_map[group_field] = group_field # 暂时不改分组字段名,如果需要可以自定义
# 应用重命名
df = df.rename(columns=rename_map)
# 调整列顺序 (可选)
# desired_order = self.config.GROUP_BY_FIELDS + [rename_map.get(f"{f}_{st}".upper(), f"{f}_{st}".lower())
# for f in self.config.STATISTICS_FIELDS
# for st in self.config.STATISTICS_TYPE]
# existing_cols = [col for col in desired_order if col in df.columns]
# df = df[existing_cols]
self.logger.info("数据格式化完成。")
return df
def _write_to_excel(self, df: pd.DataFrame):
"""
将格式化后的数据写入Excel并美化样式。
"""
self.logger.info(f"正在将数据写入Excel文件: {self.config.OUTPUT_REPORT_EXCEL}")
# 确保输出目录存在
output_dir = os.path.dirname(self.config.OUTPUT_REPORT_EXCEL)
if not os.path.exists(output_dir):
os.makedirs(output_dir)
# 创建一个新的Excel工作簿
wb = Workbook()
ws = wb.active
if ws is None:
self.logger.error("无法获取活动工作表,报告生成失败。")
return
ws.title = self.config.SUMMARY_TABLE_NAME # 设置工作表名称
# 将DataFrame数据写入工作表
for r_idx, row in enumerate(dataframe_to_rows(df, index=False, header=True), 1):
ws.append(row)
if r_idx == 1: # 设置表头样式
for cell in ws[r_idx]:
cell.font = Font(bold=True)
cell.alignment = Alignment(horizontal="center", vertical="center")
cell.border = Border(left=Side(style='thin'), right=Side(style='thin'),
top=Side(style='thin'), bottom=Side(style='thin'))
else: # 设置数据行样式
for cell in ws[r_idx]:
cell.alignment = Alignment(horizontal="center", vertical="center")
cell.border = Border(left=Side(style='thin'), right=Side(style='thin'),
top=Side(style='thin'), bottom=Side(style='thin'))
# 自动调整列宽
dim_holder = DimensionHolder(worksheet=ws) # 修正参数名
for col in range(ws.min_column, ws.max_column + 1):
max_length = 0
column = get_column_letter(col)
for cell in ws[column]:
try:
if cell.value is not None and len(str(cell.value)) > max_length: # 检查cell.value是否为None
max_length = len(str(cell.value))
except:
pass
adjusted_width = (max_length + 2) * 1.2 # 增加一些边距
dim_holder[column] = ColumnDimension(ws, min=column, max=column, width=adjusted_width)
ws.column_dimensions = dim_holder
# 冻结首行
ws.freeze_panes = ws['A2']
# 保存Excel文件
wb.save(self.config.OUTPUT_REPORT_EXCEL)
self.logger.info("Excel文件保存成功。")