基于pandas自动化的csv信息提取保存的脚本
在数据处理中遇到大量CSV文件需要提取关键信息保存为新CSV文件,为减轻人力工作,查询大量博客并结合AI编写自动化脚本实现该功能。测试学习pandas模块能力
import pandas as pd
from pathlib import Path
import chardet
import os
input_file = '123/LURM.csv' #待处理文件
#检测文件编码
with open(input_file, 'rb') as f:
raw_data = f.read(10000)# 只读取前10000字节进行检测
result = chardet.detect(raw_data)
encoding = result['encoding']
print(encoding)
#导入文件
df = pd.read_csv(input_file, encoding=encoding)
#print(df)
print(f"原始数据列名: {list(df.columns)}")
#提取指定列
columns_to_extract = ['vn', '报文类型', '采集时间', '上报时间', '车辆状态', '采集时间', '定位有效性', '纬度类型', '经度类型', '经度', '纬度']
extracted_df = df
print(extracted_df)
#保存文件
output_file = 'extracted_result.csv'
#extracted_df.to_csv(output_file, index=False, encoding='utf-8-sig')
extracted_df.to_csv(output_file, index=False, encoding=encoding)上面这段代码实现单文件信息提取并保存。要实现CSV文件处理需要使用到pandas模块,该模块是大数据文件处理较为热门模块。有编程基础的可以看注释,基本可以理解,如果没有建议使用AI工具指导。
正式处理程序
""" CSV文件提取保存程序
代码第一版时间20250911
"""
import pandas as pd
from pathlib import Path
import chardet
import os
def detect_encoding(file_path):
"""检测文件编码"""
try:
with open(file_path, 'rb') as f:
raw_data = f.read(10000)# 只读取前10000字节进行检测
result = chardet.detect(raw_data)
return result['encoding']
except Exception as e:
print(f"编码检测失败: {e}")
return None
def read_csv_with_auto_encoding(file_path):
"""自动检测编码并读取CSV文件"""
# 常见的编码尝试顺序
encodings_to_try = ['utf-8', 'gbk', 'gb2312', 'gb18030', 'latin1', 'cp1252']
# 首先尝试自动检测编码
detected_encoding = detect_encoding(file_path)
if detected_encoding:
encodings_to_try.insert(0, detected_encoding)
for encoding in encodings_to_try:
try:
df = pd.read_csv(file_path, encoding=encoding)
print(f"✓ 成功使用 {encoding} 编码读取文件")
return df, encoding
except UnicodeDecodeError:
continue
except Exception as e:
# 如果不是编码问题,可能是其他错误
if 'codec' not in str(e).lower():
raise e
# 如果所有编码都失败,尝试使用错误处理
try:
df = pd.read_csv(file_path, encoding='utf-8', errors='ignore')
print("✓ 使用utf-8编码并忽略错误字符读取文件")
return df, 'utf-8-with-errors-ignored'
except Exception as e:
raise Exception(f"所有编码尝试都失败: {e}")
def save_dataframe_to_csv(extracted_df, file_name, encoding_used, output_folder="output"):
"""
保存提取后的DataFrame到CSV文件
参数:
extracted_df (pd.DataFrame): 要保存的数据框
file_name (str): 原始文件名(不含扩展名)
encoding_used (str): 使用的编码格式
output_folder (str): 输出文件夹名,默认为"output"
返回:
bool: 保存成功返回True,失败返回False
"""
try:
# 创建输出文件夹(如果不存在)
output_path = Path(output_folder)
output_path.mkdir(exist_ok=True)
# 构建输出文件名:原文件名 + "o" + .csv
output_file = output_path / f"{file_name}_o.csv"
# 检查文件是否已存在
if output_file.exists():
raise FileExistsError(f"文件已存在: {output_file}")
# 保存为CSV文件
extracted_df.to_csv(output_file, index=False, encoding=encoding_used)
print(f"✓ 数据已成功保存到: {output_file}")
print(f"使用编码: {encoding_used}")
print(f"数据形状: {extracted_df.shape} 行, {extracted_df.shape} 列")
return True
except FileExistsError as e:
print(f"⚠️文件已存在错误: {e}")
return False
except Exception as e:
print(f"❌ 保存文件失败: {e}")
return False
def read_all_csv_files(folder_path):
"""
读取指定文件夹下的所有CSV文件(支持自动编码检测)
"""
if not os.path.exists(folder_path):
print(f"错误: 文件夹 '{folder_path}' 不存在")
return {}
csv_files = list(Path(folder_path).glob("*.csv"))
if len(csv_files) == 0:
print(f"警告: 在 '{folder_path}' 中没有找到CSV文件")
return {}
dataframes = {}
print(f"找到 {len(csv_files)} 个CSV文件:")
print("-" * 50)
for csv_file in csv_files:
try:
print(f"\n正在处理: {csv_file.name}")
df, encoding_used = read_csv_with_auto_encoding(csv_file)
print(f"✓ 成功读取: {csv_file.name}")
print(f"编码: {encoding_used}")
print(f"数据形状: {df.shape} 行, {df.shape} 列")
print("-" * 50)
print(f"原始数据列名: {list(df.columns)}")
columns_to_extract = ['vn', '报文类型', '采集时间', '上报时间', '车辆状态', '采集时间', '定位有效性', '纬度类型', '经度类型', '经度', '纬度']
extracted_df = df
#print(extracted_df)
# 使用文件名(不含路径和扩展名)作为键
file_name = csv_file.stem
dataframes = df
#保存提取后信息到文件
save_success = save_dataframe_to_csv(extracted_df, file_name, encoding_used)
if save_success:
print("✓ 提取的数据已成功保存")
else:
print("⚠️提取的数据保存失败")
except Exception as e:
print(f"✗ 无法读取 {csv_file.name}: {str(e)}")
return dataframes
# 主程序
if __name__ == "__main__":
# 设置你的文件夹路径
folder_path = "C:/Users/Downloads/LURM"
# 读取所有CSV文件
all_data = read_all_csv_files(folder_path)
print(f"\n{'='*50}")
print(f"总结: 成功读取了 {len(all_data)} 个CSV文件")
# 显示每个文件的基本信息
# for name, df in all_data.items():
# print(f"\n{name}:")
# print(f"形状: {df.shape}")
# if len(df.columns) > 0:
# print(f"列名: {list(df.columns)[:5]}{'...' if len(df.columns) > 5 else ''}")
# if len(df) > 0:
# print(f"数据预览:")
# print(df.head(2))正式程序的核心代码和测试程序一样,只不过添加大量错误检测和现实文件信息提取的代码(该段代码已注释)。
代码写的比较烂,勿喷,有问题请回复指出。
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
页:
[1]