引言:为什么你需要学习Python自动化
在当今快节奏的工作环境中,重复性的手动操作正在吞噬我们宝贵的时间和创造力。想象一下,每天早上花费30分钟整理Excel报表,或者手动备份上百个文件,这些任务不仅枯燥,还容易出错。Python自动化正是解决这些痛点的完美方案。
Python作为一门简洁而强大的编程语言,凭借其丰富的库生态系统和简单的语法,已经成为自动化领域的首选工具。无论你是数据分析师、行政人员还是项目经理,掌握Python自动化都能让你的工作效率提升数倍。
本文将从最基础的环境搭建开始,通过大量实际案例,手把手教你如何使用Python自动化处理文件操作、数据处理、网络爬虫、邮件发送等常见任务。每个知识点都会配有详细的代码示例和运行说明,确保你能真正理解和应用。
第一部分:Python环境搭建与基础准备
1.1 安装Python和必要的工具
首先,我们需要安装Python环境。推荐使用Python 3.8或更高版本,因为这些版本对现代库有更好的支持。
Windows系统安装步骤:
- 访问Python官网(https://www.python.org/downloads/)
- 下载最新的Python 3.x安装包
- 运行安装程序,务必勾选”Add Python to PATH”选项
- 点击”Install Now”完成安装
验证安装: 打开命令提示符(Win+R,输入cmd),输入:
python --version
如果显示类似”Python 3.9.7”,说明安装成功。
1.2 安装关键的自动化库
Python的强大在于其丰富的第三方库。以下是自动化必备的库:
# 文件和目录操作
pip install pathlib
# Excel自动化
pip install openpyxl pandas
# PDF处理
pip install PyPDF2
# 网络请求
pip install requests
# 邮件发送
pip install smtplib email
# 定时任务
pip install schedule
# GUI自动化
pip install pyautogui
# 数据库操作
pip install sqlite3
1.3 选择合适的代码编辑器
推荐使用VS Code或PyCharm:
- VS Code:轻量级,安装Python插件后功能强大
- PyCharm:专业级IDE,功能全面但较重
第二部分:文件和目录自动化管理
2.1 使用pathlib进行现代化的文件操作
传统的os模块虽然功能强大,但pathlib提供了更现代化、面向对象的文件操作方式。
案例:自动整理下载文件夹 假设你的下载文件夹杂乱无章,需要按文件类型自动分类整理:
from pathlib import Path
import shutil
def organize_downloads():
# 定义下载文件夹路径
downloads_path = Path.home() / "Downloads"
# 定义目标文件夹结构
categories = {
"Images": [".jpg", ".png", ".gif", ".bmp", ".svg"],
"Documents": [".pdf", ".docx", ".txt", ".xlsx", ".pptx"],
"Videos": [".mp4", ".avi", ".mov", ".mkv"],
"Music": [".mp3", ".wav", ".flac"],
"Archives": [".zip", ".rar", ".7z", ".tar"],
"Executables": [".exe", ".msi", ".dmg", ".pkg"]
}
# 创建目标文件夹
for folder in categories:
target_folder = downloads_path / folder
target_folder.mkdir(exist_ok=True)
# 遍历下载文件夹中的所有文件
for file_path in downloads_path.iterdir():
# 跳过文件夹和隐藏文件
if file_path.is_dir() or file_path.name.startswith('.'):
continue
# 获取文件扩展名(转换为小写)
file_extension = file_path.suffix.lower()
# 查找匹配的类别
moved = False
for category, extensions in categories.items():
if file_extension in extensions:
target_path = downloads_path / category / file_path.name
try:
shutil.move(str(file_path), str(target_path))
print(f"已移动: {file_path.name} -> {category}/")
moved = True
break
except Exception as e:
print(f"移动失败 {file_path.name}: {e}")
# 未分类的文件放入"Others"文件夹
if not moved:
others_folder = downloads_path / "Others"
others_folder.mkdir(exist_ok=True)
target_path = others_folder / file_path.name
try:
shutil.move(str(file_path), str(target_path))
print(f"已移动: {file_path.name} -> Others/")
except Exception as e:
print(f"移动失败 {file_path.name}: {e}")
# 运行函数
organize_downloads()
代码详细说明:
Path.home()获取用户主目录,跨平台兼容Path.home() / "Downloads"使用/运算符拼接路径,自动处理路径分隔符mkdir(exist_ok=True)创建目录,如果已存在则不报错iterdir()遍历目录中的所有条目shutil.move()安全地移动文件,保留元数据
2.2 批量重命名文件
案例:批量重命名照片文件,添加日期前缀
from pathlib import Path
from datetime import datetime
def batch_rename_photos():
photos_path = Path("C:/Users/YourName/Pictures/2024")
# 获取所有jpg和png文件
photo_files = list(photos_path.glob("*.jpg")) + list(photos_path.glob("*.png"))
for photo in photo_files:
# 获取文件修改时间
mtime = photo.stat().st_mtime
date_obj = datetime.fromtimestamp(mtime)
date_str = date_obj.strftime("%Y%m%d")
# 构建新文件名
new_name = f"{date_str}_{photo.name}"
new_path = photo.parent / new_name
# 重命名
try:
photo.rename(new_path)
print(f"重命名: {photo.name} -> {new_name}")
except Exception as e:
print(f"重命名失败 {photo.name}: {e}")
# 运行
batch_rename_photos()
第三部分:Excel自动化处理
3.1 使用openpyxl处理Excel文件
openpyxl是读写Excel 2010 xlsx/xlsm/xltx/xltm文件的库。
案例:自动生成销售报表
from openpyxl import Workbook
from openpyxl.styles import Font, PatternFill, Alignment, Border, Side
from openpyxl.utils import get_column_letter
import random
from datetime import datetime, timedelta
def generate_sales_report():
# 创建工作簿
wb = Workbook()
ws = wb.active
ws.title = "销售报表"
# 设置表头
headers = ["日期", "产品名称", "单价", "数量", "销售额", "销售员"]
ws.append(headers)
# 设置表头样式
header_fill = PatternFill(start_color="4472C4", end_color="4472C4", fill_type="solid")
header_font = Font(bold=True, color="FFFFFF", size=12)
header_alignment = Alignment(horizontal="center", vertical="center")
for col in range(1, len(headers) + 1):
cell = ws.cell(row=1, column=col)
cell.fill = header_fill
cell.font = header_font
cell.alignment = header_alignment
# 生成模拟数据
products = ["笔记本电脑", "智能手机", "平板电脑", "耳机", "智能手表"]
salespersons = ["张三", "李四", "王五", "赵六"]
start_date = datetime(2024, 1, 1)
for row in range(2, 32): # 30天的数据
# 日期
date = start_date + timedelta(days=row-2)
ws.cell(row=row, column=1, value=date.strftime("%Y-%m-%d"))
# 产品
product = random.choice(products)
ws.cell(row=row, column=2, value=product)
# 单价
prices = {"笔记本电脑": 5000, "智能手机": 3000, "平板电脑": 2000, "耳机": 500, "智能手表": 1500}
price = prices[product]
ws.cell(row=row, column=3, value=price)
# 数量
quantity = random.randint(1, 10)
ws.cell(row=row, column=4, value=quantity)
# 销售额
sales = price * quantity
ws.cell(row=row, column=5, value=sales)
# 销售员
salesperson = random.choice(salespersons)
ws.cell(row=row, column=6, value=salesperson)
# 添加汇总行
total_row = 32
ws.cell(row=total_row, column=1, value="总计")
ws.cell(row=total_row, column=5, value=f"=SUM(E2:E{total_row-1})")
# 设置汇总行样式
for col in range(1, 7):
cell = ws.cell(row=total_row, column=col)
cell.font = Font(bold=True)
cell.fill = PatternFill(start_color="FFD700", end_color="FFD700", fill_type="solid")
# 设置列宽
column_widths = [12, 20, 10, 10, 12, 10]
for i, width in enumerate(column_widths, 1):
ws.column_dimensions[get_column_letter(i)].width = width
# 设置数字格式
for row in range(2, total_row):
ws.cell(row=row, column=3).number_format = '¥#,##0'
ws.cell(row=row, column=5).number_format = '¥#,##0'
# 添加边框
thin_border = Border(left=Side(style='thin'),
right=Side(style='thin'),
top=Side(style='thin'),
bottom=Side(style='thin'))
for row in ws.iter_rows(min_row=1, max_row=total_row, min_col=1, max_col=6):
for cell in row:
cell.border = thin_border
# 保存文件
filename = f"销售报表_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xlsx"
wb.save(filename)
print(f"报表已生成: {filename}")
# 运行
generate_sales_report()
3.2 使用pandas进行高级数据处理
pandas是数据分析的利器,特别适合处理大量数据。
案例:读取多个Excel文件并合并分析
import pandas as pd
from pathlib import Path
import glob
def merge_and_analyze_excel():
# 假设有一个文件夹包含多个销售数据文件
data_folder = Path("C:/Data/Sales")
# 查找所有Excel文件
excel_files = list(data_folder.glob("sales_*.xlsx"))
if not excel_files:
print("未找到销售数据文件")
return
# 读取并合并所有文件
all_data = []
for file in excel_files:
df = pd.read_excel(file)
df['来源文件'] = file.name # 添加来源标记
all_data.append(df)
# 合并
combined_df = pd.concat(all_data, ignore_index=True)
# 数据清洗
combined_df['日期'] = pd.to_datetime(combined_df['日期'])
combined_df['销售额'] = pd.to_numeric(combined_df['销售额'], errors='coerce')
# 分析:按产品统计
product_summary = combined_df.groupby('产品名称').agg({
'销售额': ['sum', 'mean', 'count'],
'数量': 'sum'
}).round(2)
# 分析:按销售员统计
salesperson_summary = combined_df.groupby('销售员').agg({
'销售额': 'sum'
}).sort_values('销售额', ascending=False)
# 分析:按月份统计
combined_df['月份'] = combined_df['日期'].dt.to_period('M')
monthly_summary = combined_df.groupby('月份')['销售额'].sum()
# 保存分析结果
output_file = "销售分析报告.xlsx"
with pd.ExcelWriter(output_file, engine='openpyxl') as writer:
combined_df.to_excel(writer, sheet_name='原始数据', index=False)
product_summary.to_excel(writer, sheet_name='产品分析')
salesperson_summary.to_excel(writer, sheet_name='销售员分析')
monthly_summary.to_excel(writer, sheet_name='月度分析')
print(f"分析完成,结果保存至: {output_file}")
print("\n销售员排名:")
print(salesperson_summary)
# 运行
merge_and_analyze_excel()
第四部分:PDF文档自动化处理
4.1 使用PyPDF2合并和拆分PDF
案例:合并多个PDF报告并添加页码
from PyPDF2 import PdfReader, PdfWriter
from pathlib import Path
def merge_pdfs_with_page_numbers():
# PDF文件夹路径
pdf_folder = Path("C:/Reports")
# 查找所有PDF文件
pdf_files = sorted(pdf_folder.glob("report_*.pdf"))
if not pdf_files:
print("未找到PDF文件")
return
# 创建合并器
merger = PdfWriter()
for pdf_file in pdf_files:
try:
reader = PdfReader(str(pdf_file))
# 添加所有页面
for page in reader.pages:
merger.add_page(page)
print(f"已添加: {pdf_file.name}")
except Exception as e:
print(f"读取失败 {pdf_file.name}: {e}")
# 保存合并后的文件
output_file = pdf_folder / "合并报告.pdf"
with open(output_file, "wb") as output:
merger.write(output)
print(f"合并完成: {output_file}")
print(f"总页数: {len(merger.pages)}")
# 运行
merge_pdfs_with_page_numbers()
4.2 提取PDF中的文本和表格数据
import pdfplumber
import pandas as pd
def extract_data_from_pdf():
pdf_path = "C:/Data/财务报表.pdf"
all_tables = []
with pdfplumber.open(pdf_path) as pdf:
for page_num, page in enumerate(pdf.pages, 1):
# 提取表格
tables = page.extract_tables()
for table_num, table in enumerate(tables):
# 转换为DataFrame
df = pd.DataFrame(table[1:], columns=table[0])
df['页码'] = page_num
df['表格编号'] = table_num + 1
all_tables.append(df)
if all_tables:
# 合并所有表格
combined_df = pd.concat(all_tables, ignore_index=True)
# 保存为Excel
output_file = "提取的表格数据.xlsx"
combined_df.to_excel(output_file, index=False)
print(f"提取完成,共{len(combined_df)}行数据")
print(combined_df.head())
else:
print("未找到表格")
# 运行
extract_data_from_pdf()
第五部分:网络自动化与爬虫
5.1 使用requests进行API调用
案例:自动获取天气数据并生成报告
import requests
import json
from datetime import datetime
from pathlib import Path
def get_weather_report():
# 使用免费的天气API(示例使用OpenWeatherMap)
API_KEY = "YOUR_API_KEY" # 需要注册获取
cities = ["Beijing", "Shanghai", "Guangzhou", "Shenzhen"]
weather_data = []
for city in cities:
# 构建API URL
url = f"http://api.openweathermap.org/data/2.5/weather"
params = {
"q": city,
"appid": API_KEY,
"units": "metric" # 摄氏度
}
try:
response = requests.get(url, params=params, timeout=10)
response.raise_for_status() # 检查请求是否成功
data = response.json()
weather_info = {
"城市": city,
"天气": data["weather"][0]["description"],
"温度": data["main"]["temp"],
"湿度": data["main"]["humidity"],
"风速": data["wind"]["speed"],
"获取时间": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
weather_data.append(weather_info)
print(f"已获取 {city} 的天气数据")
except requests.exceptions.RequestException as e:
print(f"获取 {city} 天气数据失败: {e}")
# 保存为JSON文件
output_file = Path("weather_report.json")
with open(output_file, "w", encoding="utf-8") as f:
json.dump(weather_data, f, ensure_ascii=False, indent=2)
print(f"天气报告已保存: {output_file}")
# 同时生成文本报告
report_file = Path("天气报告.txt")
with open(report_file, "w", encoding="utf-8") as f:
f.write("今日天气报告\n")
f.write("=" * 30 + "\n")
for data in weather_data:
f.write(f"城市: {data['城市']}\n")
f.write(f"天气: {data['天气']}\n")
f.write(f"温度: {data['温度']}°C\n")
f.write(f"湿度: {data['湿度']}%\n")
f.write(f"风速: {data['风速']} m/s\n")
f.write(f"时间: {data['获取时间']}\n")
f.write("-" * 30 + "\n")
print(f"文本报告已保存: {report_file}")
# 运行
# get_weather_report() # 需要有效的API密钥
5.2 网页爬虫基础
案例:爬取新闻网站标题和链接
import requests
from bs4 import BeautifulSoup
from pathlib import Path
from datetime import datetime
def scrape_news_titles():
# 目标网站(示例)
url = "https://example-news-site.com"
# 设置请求头,模拟浏览器访问
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
}
try:
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
# 解析HTML
soup = BeautifulSoup(response.text, 'html.parser')
# 查找新闻标题(需要根据实际网站结构调整)
news_items = []
# 假设新闻标题在class为"news-title"的div中
for item in soup.find_all('div', class_='news-title'):
title = item.get_text(strip=True)
link = item.find('a')['href'] if item.find('a') else 'N/A'
news_items.append({
'标题': title,
'链接': link,
'抓取时间': datetime.now().strftime("%Y-%m-%d %H:%M:%S")
})
# 保存结果
output_file = Path("news_titles.json")
with open(output_file, "w", encoding="utf-8") as f:
json.dump(news_items, f, ensure_ascii=False, indent=2)
print(f"成功抓取 {len(news_items)} 条新闻")
print(f"结果保存至: {output_file}")
# 打印前5条
for i, news in enumerate(news_items[:5], 1):
print(f"{i}. {news['标题']}")
except Exception as e:
print(f"爬取失败: {e}")
# 运行
# scrape_news_titles()
第六部分:邮件自动化
6.1 发送普通邮件
案例:自动发送日报给团队成员
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.mime.application import MIMEApplication
from datetime import datetime
from pathlib import Path
def send_daily_report():
# 邮件配置
smtp_server = "smtp.qq.com" # 根据你的邮箱服务商修改
smtp_port = 465 # SSL端口
sender_email = "your_email@qq.com"
sender_password = "your_authorization_code" # 授权码,不是登录密码
receiver_emails = ["team1@company.com", "team2@company.com"]
# 创建邮件
msg = MIMEMultipart()
msg['From'] = sender_email
msg['To'] = ", ".join(receiver_emails)
msg['Subject'] = f"工作日报 - {datetime.now().strftime('%Y-%m-%d')}"
# 邮件正文
body = f"""
<html>
<body>
<h2>今日工作日报</h2>
<p>发送时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}</p>
<h3>已完成工作:</h3>
<ul>
<li>完成销售数据分析报告</li>
<li>整理客户反馈意见</li>
<li>更新项目进度表</li>
</ul>
<h3>明日计划:</h3>
<ul>
<li>准备周会材料</li>
<li>跟进客户需求</li>
</ul>
<p><strong>附件:</strong> 今日销售数据.xlsx</p>
</body>
</html>
"""
# 添加HTML正文
msg.attach(MIMEText(body, 'html'))
# 添加附件
attachment_path = Path("销售报表.xlsx")
if attachment_path.exists():
with open(attachment_path, "rb") as f:
attachment = MIMEApplication(f.read())
attachment.add_header('Content-Disposition', 'attachment',
filename=attachment_path.name)
msg.attach(attachment)
# 发送邮件
try:
server = smtplib.SMTP_SSL(smtp_server, smtp_port)
server.login(sender_email, sender_password)
server.sendmail(sender_email, receiver_emails, msg.as_string())
server.quit()
print("邮件发送成功!")
except Exception as e:
print(f"邮件发送失败: {e}")
# 运行
# send_daily_report()
6.2 检查邮箱并自动回复
import imaplib
import email
from email.header import decode_header
import time
def check_and_auto_reply():
# IMAP配置
imap_server = "imap.qq.com"
username = "your_email@qq.com"
password = "your_authorization_code"
try:
# 连接邮箱
mail = imaplib.IMAP4_SSL(imap_server)
mail.login(username, password)
# 选择收件箱
mail.select("INBOX")
# 搜索未读邮件
status, messages = mail.search(None, 'UNSEEN')
if status == "OK":
email_ids = messages[0].split()
for e_id in email_ids:
# 获取邮件
status, msg_data = mail.fetch(e_id, '(RFC822)')
if status == "OK":
# 解析邮件
msg = email.message_from_bytes(msg_data[0][1])
# 获取发件人和主题
from_email, encoding = decode_header(msg["From"])[0]
if isinstance(from_email, bytes):
from_email = from_email.decode(encoding or 'utf-8')
subject, encoding = decode_header(msg["Subject"])[0]
if isinstance(subject, bytes):
subject = subject.decode(encoding or 'utf-8')
print(f"收到新邮件: {subject} - 来自 {from_email}")
# 发送自动回复(简单示例)
# 这里可以调用前面的send_daily_report函数
# 或者发送特定的回复内容
print(f"处理了 {len(email_ids)} 封新邮件")
mail.close()
mail.logout()
except Exception as e:
print(f"邮箱处理失败: {e}")
# 运行
# check_and_auto_reply()
第七部分:GUI自动化
7.1 使用pyautogui控制鼠标和键盘
案例:自动填写网页表单
import pyautogui
import time
import random
def auto_fill_form():
# 安全设置:将鼠标移动到屏幕左上角可终止程序
pyautogui.FAILSAFE = True
print("请在5秒内切换到要填写的窗口...")
time.sleep(5)
# 模拟填写表单
form_data = {
"姓名": "张三",
"邮箱": "zhangsan@example.com",
"电话": "13800138000",
"备注": "自动填写的测试数据"
}
# 假设表单字段的坐标(需要预先获取)
# 使用pyautogui.position()可以获取当前鼠标位置
field_positions = {
"姓名": (200, 300),
"邮箱": (200, 350),
"电话": (200, 400),
"备注": (200, 450)
}
for field, value in form_data.items():
# 移动到输入框并点击
pos = field_positions[field]
pyautogui.click(pos[0], pos[1])
time.sleep(0.5)
# 输入内容
pyautogui.write(value, interval=0.1) # interval模拟打字效果
time.sleep(0.5)
# 点击提交按钮
submit_pos = (200, 550)
pyautogui.click(submit_pos[0], submit_pos[1])
print("表单填写完成!")
# 运行
# auto_fill_form()
第八部分:定时任务与自动化工作流
8.1 使用schedule库创建定时任务
案例:每天早上9点自动运行数据备份
import schedule
import time
from datetime import datetime
from pathlib import Path
import shutil
def backup_data():
"""数据备份任务"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
source = Path("C:/重要数据")
backup_dir = Path("D:/备份")
if not source.exists():
print("源目录不存在")
return
backup_dir.mkdir(exist_ok=True)
backup_name = f"backup_{timestamp}"
backup_path = backup_dir / backup_name
try:
shutil.copytree(source, backup_path)
print(f"[{datetime.now()}] 备份完成: {backup_path}")
except Exception as e:
print(f"备份失败: {e}")
def daily_report_task():
"""日报生成任务"""
print(f"[{datetime.now()}] 开始生成日报...")
# 调用前面定义的函数
# generate_sales_report()
# send_daily_report()
print(f"[{datetime.now()}] 日报任务完成")
def setup_schedule():
"""设置定时任务"""
# 每天9:00执行备份
schedule.every().day.at("09:00").do(backup_data)
# 每天18:00执行日报
schedule.every().day.at("18:00").do(daily_report_task)
# 每周一早上9:30执行周报
schedule.every().monday.at("09:30").do(lambda: print("周报任务"))
# 每5分钟检查一次邮件
schedule.every(5).minutes.do(lambda: print("检查邮件..."))
print("定时任务已设置:")
print("- 每天09:00 执行数据备份")
print("- 每天18:00 生成并发送日报")
print("- 每周一09:30 生成周报")
print("- 每5分钟 检查邮件")
print("\n程序正在运行,按 Ctrl+C 退出...")
# 保持程序运行
while True:
schedule.run_pending()
time.sleep(1)
# 运行
# setup_schedule()
8.2 使用Windows任务计划程序(替代方案)
对于Windows用户,还可以使用系统自带的任务计划程序:
# 创建批处理文件 backup.bat
"""
@echo off
cd /d C:\你的脚本路径
python backup_script.py
"""
# 创建任务计划
# 1. 打开任务计划程序
# 2. 创建基本任务
# 3. 设置触发器(每天9:00)
# 4. 操作:启动程序
# 5. 选择 backup.bat
第九部分:综合案例:全自动工作日报系统
9.1 完整的日报生成和发送系统
import pandas as pd
from pathlib import Path
from datetime import datetime, timedelta
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.application import MIMEApplication
import json
import shutil
class DailyReportAutomation:
"""全自动日报系统"""
def __init__(self, config_file="config.json"):
"""初始化配置"""
self.config = self.load_config(config_file)
self.timestamp = datetime.now().strftime("%Y-%m-%d")
def load_config(self, config_file):
"""加载配置文件"""
default_config = {
"data_sources": {
"sales": "C:/Data/Sales/sales_*.xlsx",
"tasks": "C:/Data/Tasks/task_list.xlsx"
},
"email": {
"smtp_server": "smtp.qq.com",
"smtp_port": 465,
"sender": "your_email@qq.com",
"password": "your_authorization_code",
"receivers": ["manager@company.com", "team@company.com"]
},
"output": {
"report_dir": "C:/Reports/Daily",
"backup_dir": "C:/Reports/Backup"
}
}
# 如果配置文件不存在,创建默认配置
config_path = Path(config_file)
if not config_path.exists():
with open(config_path, "w", encoding="utf-8") as f:
json.dump(default_config, f, ensure_ascii=False, indent=2)
print(f"已创建默认配置文件: {config_file}")
return default_config
# 加载现有配置
with open(config_path, "r", encoding="utf-8") as f:
return json.load(f)
def collect_data(self):
"""收集数据"""
print("正在收集数据...")
# 收集销售数据
sales_files = list(Path("C:/Data/Sales").glob("sales_*.xlsx"))
all_sales = []
for file in sales_files:
try:
df = pd.read_excel(file)
# 只保留今天的
df['日期'] = pd.to_datetime(df['日期'])
today = pd.to_datetime(self.timestamp)
df = df[df['日期'].dt.date == today.date()]
if not df.empty:
all_sales.append(df)
except Exception as e:
print(f"读取销售文件失败 {file}: {e}")
sales_df = pd.concat(all_sales, ignore_index=True) if all_sales else pd.DataFrame()
# 收集任务数据
tasks_df = pd.DataFrame()
try:
tasks_file = Path("C:/Data/Tasks/task_list.xlsx")
if tasks_file.exists():
tasks_df = pd.read_excel(tasks_file)
tasks_df = tasks_df[tasks_df['状态'] == '进行中']
except Exception as e:
print(f"读取任务文件失败: {e}")
return sales_df, tasks_df
def generate_report(self, sales_df, tasks_df):
"""生成报告内容"""
print("正在生成报告...")
report_content = []
report_content.append(f"工作日报 - {self.timestamp}")
report_content.append("=" * 50)
report_content.append("")
# 销售数据部分
if not sales_df.empty:
report_content.append("【销售数据】")
total_sales = sales_df['销售额'].sum()
total_orders = len(sales_df)
report_content.append(f"今日订单数: {total_orders}")
report_content.append(f"今日销售额: ¥{total_sales:,.2f}")
# 按产品统计
product_summary = sales_df.groupby('产品名称')['销售额'].sum()
report_content.append("\n产品销售排行:")
for product, sales in product_summary.items():
report_content.append(f" - {product}: ¥{sales:,.2f}")
else:
report_content.append("【销售数据】今日暂无销售数据")
report_content.append("")
# 任务进度部分
if not tasks_df.empty:
report_content.append("【任务进度】")
for _, task in tasks_df.iterrows():
report_content.append(f" - {task['任务名称']} ({task['进度']}%)")
else:
report_content.append("【任务进度】暂无进行中的任务")
report_content.append("")
report_content.append("【明日计划】")
report_content.append("1. 继续跟进客户需求")
report_content.append("2. 准备周会材料")
report_content.append("3. 完成数据分析报告")
report_content.append("")
report_content.append(f"生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
return "\n".join(report_content)
def save_report(self, report_content):
"""保存报告到文件"""
output_dir = Path(self.config["output"]["report_dir"])
output_dir.mkdir(parents=True, exist_ok=True)
# 保存为文本文件
report_file = output_dir / f"日报_{self.timestamp}.txt"
with open(report_file, "w", encoding="utf-8") as f:
f.write(report_content)
# 备份
backup_dir = Path(self.config["output"]["backup_dir"])
backup_dir.mkdir(parents=True, exist_ok=True)
backup_file = backup_dir / f"日报_{self.timestamp}.txt"
shutil.copy(report_file, backup_file)
print(f"报告已保存: {report_file}")
return report_file
def send_email(self, report_content, attachments=None):
"""发送邮件"""
print("正在发送邮件...")
email_config = self.config["email"]
msg = MIMEMultipart()
msg['From'] = email_config["sender"]
msg['To'] = ", ".join(email_config["receivers"])
msg['Subject'] = f"工作日报 - {self.timestamp}"
# 添加正文
msg.attach(MIMEText(report_content, 'plain'))
# 添加附件
if attachments:
for attachment_path in attachments:
if Path(attachment_path).exists():
with open(attachment_path, "rb") as f:
part = MIMEApplication(f.read())
part.add_header('Content-Disposition', 'attachment',
filename=Path(attachment_path).name)
msg.attach(part)
# 发送
try:
server = smtplib.SMTP_SSL(email_config["smtp_server"], email_config["smtp_port"])
server.login(email_config["sender"], email_config["password"])
server.sendmail(email_config["sender"], email_config["receivers"], msg.as_string())
server.quit()
print("邮件发送成功!")
return True
except Exception as e:
print(f"邮件发送失败: {e}")
return False
def run(self):
"""运行完整流程"""
print(f"\n{'='*50}")
print(f"开始执行日报自动化 - {self.timestamp}")
print(f"{'='*50}\n")
try:
# 1. 收集数据
sales_df, tasks_df = self.collect_data()
# 2. 生成报告
report_content = self.generate_report(sales_df, tasks_df)
print("\n报告预览:")
print("-" * 30)
print(report_content)
print("-" * 30)
# 3. 保存报告
report_file = self.save_report(report_content)
# 4. 发送邮件
success = self.send_email(report_content, attachments=[report_file])
if success:
print("\n✅ 日报自动化完成!")
else:
print("\n⚠️ 日报生成完成,但邮件发送失败")
return success
except Exception as e:
print(f"\n❌ 执行失败: {e}")
return False
# 使用示例
if __name__ == "__main__":
# 创建自动化实例
automation = DailyReportAutomation()
# 运行一次
automation.run()
# 如果需要定时运行,可以结合schedule
# schedule.every().day.at("18:00").do(automation.run)
# while True:
# schedule.run_pending()
# time.sleep(1)
第十部分:错误处理与日志记录
10.1 完善的错误处理机制
import logging
from pathlib import Path
from datetime import datetime
def setup_logging():
"""配置日志系统"""
log_dir = Path("logs")
log_dir.mkdir(exist_ok=True)
log_file = log_dir / f"automation_{datetime.now().strftime('%Y%m%d')}.log"
# 配置日志格式
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_file, encoding='utf-8'),
logging.StreamHandler() # 同时输出到控制台
]
)
return logging.getLogger(__name__)
# 使用日志
logger = setup_logging()
def safe_file_operation():
"""安全的文件操作示例"""
try:
logger.info("开始文件操作")
# 模拟可能出错的操作
file_path = Path("C:/Data/重要文件.txt")
if not file_path.exists():
logger.warning(f"文件不存在: {file_path}")
return False
# 读取文件
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
logger.info(f"成功读取文件,内容长度: {len(content)}")
return True
except PermissionError:
logger.error("权限不足,无法访问文件")
return False
except Exception as e:
logger.error(f"未知错误: {e}", exc_info=True) # 记录完整堆栈信息
return False
第十一部分:性能优化与最佳实践
11.1 代码优化技巧
# 1. 使用列表推导式替代循环
def process_files_slow(file_list):
"""慢速版本"""
result = []
for file in file_list:
if file.endswith('.txt'):
result.append(file.upper())
return result
def process_files_fast(file_list):
"""快速版本"""
return [file.upper() for file in file_list if file.endswith('.txt')]
# 2. 使用生成器处理大文件
def read_large_file(file_path):
"""逐行读取大文件,节省内存"""
with open(file_path, 'r', encoding='utf-8') as f:
for line in f:
yield line.strip()
# 3. 使用多进程处理CPU密集任务
from multiprocessing import Pool
import os
def process_data_chunk(chunk):
"""处理数据块"""
return [x ** 2 for x in chunk]
def parallel_processing():
"""并行处理示例"""
data = list(range(1000000))
chunk_size = len(data) // os.cpu_count()
chunks = [data[i:i+chunk_size] for i in range(0, len(data), chunk_size)]
with Pool() as pool:
results = pool.map(process_data_chunk, chunks)
return results
第十二部分:部署与维护
12.1 创建可执行文件
# 使用PyInstaller创建独立可执行文件
# 安装: pip install pyinstaller
# 命令行执行:
"""
pyinstaller --onefile --windowed --name="DailyReport" daily_report.py
"""
# 创建配置文件 daily_report.spec
"""
# -*- mode: python ; coding: utf-8 -*-
block_cipher = None
a = Analysis(
['daily_report.py'],
pathex=[],
binaries=[],
datas=[('config.json', '.')],
hiddenimports=[],
hookspath=[],
hooksconfig={},
runtime_hooks=[],
excludes=[],
win_no_prefer_redirects=False,
win_private_assemblies=False,
cipher=block_cipher,
noarchive=False,
)
pyz = PYZ(a.pure, a.zipped_data, cipher=block_cipher)
exe = EXE(
pyz,
a.scripts,
a.binaries,
a.zipfiles,
a.datas,
[],
name='DailyReport',
debug=False,
bootloader_ignore_signals=False,
strip=False,
upx=True,
upx_exclude=[],
runtime_tmpdir=None,
console=False,
disable_windowed_traceback=False,
target_arch=None,
codesign_identity=None,
entitlements_file=None,
)
"""
总结与进阶学习
通过本文的学习,你已经掌握了Python自动化的核心技能:
- 文件操作:使用pathlib进行现代化的文件管理
- Excel处理:使用openpyxl和pandas处理数据
- PDF处理:合并、拆分和提取PDF内容
- 网络自动化:API调用和网页爬虫
- 邮件自动化:发送邮件和自动回复
- GUI自动化:控制鼠标键盘
- 定时任务:自动化工作流
- 综合案例:完整的日报系统
进阶学习方向
- 异步编程:使用asyncio提高IO密集型任务的效率
- Web框架:使用Flask或FastAPI创建自动化服务
- 数据库集成:使用SQLAlchemy管理数据
- 云部署:将自动化脚本部署到云端
- 监控告警:集成Prometheus和Grafana
最佳实践建议
- 始终备份重要数据:自动化操作前确保有备份
- 充分测试:在测试环境验证后再部署到生产环境
- 记录日志:便于问题排查和审计
- 优雅降级:处理异常情况,避免程序崩溃
- 定期维护:更新依赖库,优化代码
Python自动化是一个持续学习的过程。建议从简单的任务开始,逐步构建复杂的自动化系统。记住,自动化的目的是节省时间,而不是增加复杂度。
开始你的自动化之旅吧!如果遇到问题,Python社区和Stack Overflow都是很好的求助资源。祝你自动化愉快!
