用Python自动化枯燥的工作 第8篇:定时任务与系统自动化
摘要
本文是本系列的最后一篇,将带你深入理解定时任务调度和系统自动化的核心技术。你将学到时间与定时任务的处理、进程控制与系统命令执行、图像识别与OCR技术、键盘鼠标自动化,以及综合自动化项目的实战应用。掌握这些技术,你将能够构建真正解放双手的自动化系统。
学习目标
阅读完本文后,你将能够:
- 任务调度:能够使用schedule和APScheduler设置定时任务,实现周期性自动化
- 进程控制:能够使用subprocess执行系统命令,管理外部进程
- 图像识别:能够使用OCR技术识别图片中的文字,自动化处理图像
- RPA自动化:能够使用PyAutoGUI控制键盘鼠标,实现界面操作自动化
- 综合应用:能够构建完整的自动化项目,整合多种技术实现复杂自动化
一、时间与定时任务:让程序按计划执行
1.1 Python时间处理基础
时间处理是定时任务的基础,Python提供了丰富的时间日期处理工具。
import time
from datetime import datetime, timedelta
# time模块 - 时间戳
timestamp = time.time()
print(f"当前时间戳: {timestamp}")
# 格式化时间
formatted = time.strftime("%Y-%m-%d %H:%M:%S")
print(f"格式化时间: {formatted}")
# 程序暂停
print("等待3秒...")
time.sleep(3)
print("继续执行")
# datetime模块 - 日期时间
now = datetime.now()
print(f"当前时间: {now}")
print(f"年份: {now.year}")
print(f"月份: {now.month}")
print(f"日期: {now.day}")
# 时间计算
tomorrow = now + timedelta(days=1)
next_week = now + timedelta(weeks=1)
one_hour_later = now + timedelta(hours=1)
print(f"明天: {tomorrow}")
print(f"下周: {next_week}")1.2 使用schedule库调度任务
schedule库提供了简单直观的API来设置定时任务。
import schedule
import time
def job():
"""要执行的任务"""
print(f"执行任务: {datetime.now()}")
def job_with_args(name):
"""带参数的任务"""
print(f"Hello {name}! - {datetime.now()}")
# 每10分钟执行一次
schedule.every(10).minutes.do(job)
# 每小时执行一次
schedule.every().hour.do(job)
# 每天早上9点执行
schedule.every().day.at("09:00").do(job)
# 每周一执行
schedule.every().monday.do(job)
# 每2小时执行一次
schedule.every(2).hours.do(job)
# 每30秒执行一次
schedule.every(30).seconds.do(job_with_args, "Alice")
# 运行调度器
print("调度器已启动,按Ctrl+C停止")
try:
while True:
schedule.run_pending()
time.sleep(1)
except KeyboardInterrupt:
print("\n调度器已停止")1.3 使用APScheduler高级调度
APScheduler是功能更强大的任务调度库,支持持久化、分布式等高级特性。
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger
from datetime import datetime
def scheduled_job():
"""定时任务"""
print(f"执行任务: {datetime.now()}")
def daily_backup():
"""每日备份任务"""
print(f"执行每日备份: {datetime.now()}")
# 创建调度器
scheduler = BlockingScheduler()
# 添加任务 - 间隔触发
scheduler.add_job(
scheduled_job,
'interval',
seconds=30,
id='interval_job',
name='每30秒执行'
)
# 添加任务 - Cron表达式
scheduler.add_job(
daily_backup,
CronTrigger.from_crontab('0 2 * * *'), # 每天凌晨2点
id='daily_backup',
name='每日备份'
)
# 添加任务 - 一次性任务
scheduler.add_job(
scheduled_job,
'date',
run_date=datetime(2024, 12, 31, 23, 59, 59),
id='year_end',
name='年底任务'
)
print("调度器已启动")
try:
scheduler.start()
except (KeyboardInterrupt, SystemExit):
print("\n调度器已停止")下面的序列图展示了定时任务调度的工作机制:
sequenceDiagram participant Main as 主程序 participant Scheduler as 调度器 participant Queue as 任务队列 participant Worker as 工作线程 participant Job as 任务函数 Note over Main,Job: 初始化阶段 Main->>Scheduler: 创建调度器 Main->>Scheduler: 添加定时任务 Note right of Scheduler: 任务1: 每10分钟<br/>任务2: 每天早上9点<br/>任务3: Cron表达式 Scheduler->>Queue: 将任务加入队列 Note over Main,Job: 运行阶段 Main->>Scheduler: 启动调度器 Scheduler->>Scheduler: 检查任务触发时间 loop 每秒检查 Scheduler->>Scheduler: 当前时间: 09:00:00 Scheduler->>Queue: 有任务到期? Queue-->>Scheduler: 任务2到期 Scheduler->>Worker: 提交任务到工作线程 Worker->>Job: 执行任务函数 Job-->>Worker: 返回结果 Worker-->>Scheduler: 任务完成 Scheduler->>Scheduler: 计算下次执行时间 Scheduler->>Queue: 更新任务队列 end Note over Main,Job: 持续运行...
图表讲解:这个序列图展示了任务调度器从初始化到持续运行的完整流程,理解这个流程有助于掌握定时任务的执行机制。
初始化阶段(绿色区域):主程序创建调度器对象,配置调度器类型(BlockingScheduler阻塞式、BackgroundScheduler后台式)。然后添加定时任务,每个任务包含:执行函数(要运行的代码)、触发器(何时触发)、任务ID、任务名称等。触发器类型包括interval(固定间隔)、date(特定时间)、cron(复杂定时规则)。调度器将任务加入内部队列,按照触发时间排序。
运行阶段(黄色循环):主程序启动调度器,调度器开始循环执行。每次循环检查当前时间,对比任务队列中的任务触发时间。
如果有任务到期(蓝色区域):从队列取出到期任务,提交到工作线程执行。工作线程调用任务函数,执行实际的业务逻辑(如发送邮件、生成报表、备份数据等)。任务执行完成后,工作线程通知调度器。调度器根据任务的触发规则计算下次执行时间,更新任务队列。
这个过程无限循环(黄色区域),直到程序被手动停止或系统关闭。调度器在循环间隔内(通常1秒)检查所有任务,确保不会遗漏任何应该执行的任务。
理解这个流程,可以设计复杂的定时任务系统,如每天早上8点发送报告、每小时检查新文件、每周末执行清理等。APScheduler还支持任务持久化(重启后恢复)、分布式调度(多实例协调)、错过任务补偿(机器重启后补执行)等高级功能。
二、进程控制与系统命令:集成外部工具
2.1 使用subprocess执行命令
subprocess模块是Python执行系统命令的标准方式。
import subprocess
# 简单命令执行
result = subprocess.run(['ls', '-l'], capture_output=True, text=True)
print(result.stdout)
# 带错误处理的命令执行
try:
result = subprocess.run(
['grep', 'pattern', 'file.txt'],
check=True, # 非零退出码抛出异常
capture_output=True,
text=True
)
print(result.stdout)
except subprocess.CalledProcessError as e:
print(f"命令执行失败: {e}")
print(f"错误输出: {e.stderr}")
# 实时获取输出
process = subprocess.Popen(
['ping', '-c', '4', 'example.com'],
stdout=subprocess.PIPE,
text=True
)
for line in process.stdout:
print(line.strip())
process.wait()2.2 管道和重定向
import subprocess
# 管道连接多个命令
# 相当于: cat file.txt | grep "pattern" | wc -l
p1 = subprocess.Popen(['cat', 'file.txt'], stdout=subprocess.PIPE)
p2 = subprocess.Popen(['grep', 'pattern'], stdin=p1.stdout, stdout=subprocess.PIPE)
p3 = subprocess.Popen(['wc', '-l'], stdin=p2.stdout, stdout=subprocess.PIPE)
p1.stdout.close() # 允许p1接收SIGPIPE
p2.stdout.close()
output = p3.communicate()[0]
print(f"匹配行数: {output.decode().strip()}")
# 输入输出重定向
with open('input.txt', 'r') as f_in:
with open('output.txt', 'w') as f_out:
subprocess.run(['python', 'process.py'],
stdin=f_in,
stdout=f_out)2.3 进程管理
import subprocess
import time
import signal
def run_with_timeout(command, timeout=10):
"""运行命令,带超时控制"""
try:
process = subprocess.Popen(
command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
# 等待进程完成或超时
stdout, stderr = process.communicate(timeout=timeout)
if process.returncode == 0:
print("命令执行成功")
print(stdout.decode())
else:
print(f"命令失败,退出码: {process.returncode}")
print(stderr.decode())
except subprocess.TimeoutExpired:
print(f"命令超时({timeout}秒),正在终止...")
process.kill()
stdout, stderr = process.communicate()
print("进程已终止")
# 使用示例
run_with_timeout(['sleep', '5'], timeout=3)
# 异步运行进程
def run_async_command(command):
"""异步运行命令"""
process = subprocess.Popen(
command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
# 进程在后台运行
print(f"进程ID: {process.pid}")
print("进程在后台运行...")
# 可以做其他事情
time.sleep(2)
# 检查进程状态
poll_result = process.poll()
if poll_result is None:
print("进程仍在运行")
process.terminate() # 优雅终止
elif poll_result == 0:
print("进程已成功完成")
else:
print(f"进程异常退出: {poll_result}")下面的状态图展示了进程生命周期的各个状态:
stateDiagram-v2 [*] --> 创建: Popen() 创建 --> 运行中: 进程启动 运行中 --> 等待: 等待输入/输出 等待 --> 运行中: I/O完成 运行中 --> 已完成: 进程正常退出 运行中 --> 已终止: 进程被终止 运行中 --> 已停止: 进程收到停止信号 已停止 --> 运行中: 收到继续信号 已停止 --> 已终止: 收到终止信号 已完成 --> [*] 已终止 --> [*] note right of 创建 进程被创建但 尚未开始执行 end note note right of 运行中 进程正在执行 可能处于等待状态 end note note right of 已完成 进程正常退出 returncode = 0 end note note right of 已终止 进程被kill或 异常退出 returncode != 0 end note
图表讲解:这个状态图展示了外部进程从创建到结束的完整生命周期,理解这些状态有助于有效管理子进程。
进程从创建状态开始(绿色区域):使用subprocess.Popen()创建进程对象,此时进程已被分配资源但尚未开始执行。这是准备阶段,可以设置各种属性如环境变量、工作目录、输入输出重定向等。
创建后进入运行中状态(黄色区域):进程正在执行程序代码。在执行过程中,进程可能需要等待输入/输出操作(如读取文件、网络请求),此时进入等待状态(蓝色区域)。I/O完成后返回运行中状态。运行中和等待之间的转换可能发生多次。
进程最终会结束,有三种可能的结束状态(红色区域):正常完成、被终止、被停止。
正常完成状态:进程执行完毕,调用exit()或从main函数返回,退出码为0。这是期望的结束方式,表示任务成功完成。
被终止状态:进程被kill()强制终止,或因严重错误(如段错误)异常退出,退出码非0。这种终止方式是突然的,进程来不及清理资源。terminate()发送SIGTERM信号,给进程优雅退出的机会;kill()发送SIGKILL信号,立即终止。
被停止状态:进程收到SIGSTOP信号暂停执行(如按下Ctrl+Z),可以被SIGCONT信号恢复到运行中状态,也可以被终止信号结束。这个状态用于调试进程。
理解这些状态,可以有效管理子进程:设置合理的超时避免进程挂起、正确等待进程结束获取输出、必要时优雅终止进程、处理各种异常退出情况。
三、图像识别与OCR:从图片中提取信息
3.1 OCR基础概念
OCR(Optical Character Recognition,光学字符识别)是从图像中提取文字的技术,在自动化中有广泛应用。
3.2 使用Tesseract OCR
import pytesseract
from PIL import Image
# 基本文字识别
def ocr_image(image_path):
"""从图像中识别文字"""
image = Image.open(image_path)
# 识别文字
text = pytesseract.image_to_string(image, lang='chi_sim+eng')
return text
# 识别结果
text = ocr_image('screenshot.png')
print(text)
# 获取详细信息
def ocr_with_details(image_path):
"""获取详细的OCR结果"""
image = Image.open(image_path)
# 获取边界框信息
data = pytesseract.image_to_data(image, output_type=pytesseract.Output.DICT)
for i, text in enumerate(data['text']):
confidence = int(data['conf'][i])
if confidence > 60: # 置信度过滤
x, y, w, h = data['left'][i], data['top'][i], data['width'][i], data['height'][i]
print(f"文字: {text}, 位置: ({x},{y}), 大小: {w}x{h}, 置信度: {confidence}")
# 使用示例
# ocr_with_details('document.png')3.3 预处理提高识别率
from PIL import Image, ImageEnhance, ImageFilter
def preprocess_image(image_path):
"""预处理图像以提高OCR准确率"""
image = Image.open(image_path)
# 转换为灰度图
image = image.convert('L')
# 调整对比度
enhancer = ImageEnhance.Contrast(image)
image = enhancer.enhance(2.0)
# 调整亮度
enhancer = ImageEnhance.Brightness(image)
image = enhancer.enhance(1.2)
# 去噪
image = image.filter(ImageFilter.MedianFilter())
# 二值化
threshold = 127
image = image.point(lambda x: 0 if x < threshold else 255, '1')
return image
# 使用预处理
processed_image = preprocess_image('screenshot.png')
text = pytesseract.image_to_string(processed_image, lang='eng')
print(text)3.4 屏幕截图自动化识别
import pyautogui
import pytesseract
from PIL import Image
import time
def screenshot_and_ocr(region=None):
"""截屏并OCR识别"""
# 截图
if region:
# region = (x, y, width, height)
screenshot = pyautogui.screenshot(region=region)
else:
screenshot = pyautogui.screenshot()
# OCR识别
text = pytesseract.image_to_string(screenshot, lang='chi_sim+eng')
return text
# 监控屏幕区域变化
def monitor_screen_region(region, check_interval=5):
"""监控屏幕区域"""
print(f"开始监控屏幕区域: {region}")
last_text = None
while True:
try:
current_text = screenshot_and_ocr(region)
if current_text != last_text:
print(f"\n检测到变化 ({time.strftime('%H:%M:%S')}):")
print(current_text)
last_text = current_text
time.sleep(check_interval)
except KeyboardInterrupt:
print("\n监控已停止")
break
# 使用示例
# monitor_screen_region((100, 100, 400, 200), check_interval=3)下面的流程图展示了OCR处理的完整流程:
flowchart TD A[获取图像] --> B[图像预处理] B --> C{需要什么处理?} C -->|灰度化| D[转换灰度图] C -->|降噪| E[应用滤镜] C -->|二值化| F[阈值转换] C -->|旋转| G[校正倾斜] D --> H[增强对比度] E --> H F --> H G --> H H --> I[OCR引擎识别] I --> J[提取文字内容] J --> K{后处理需求?} K -->|置信度过滤| L[过滤低置信度] K -->|正则提取| M[提取特定模式] K -->|文本修正| N[拼写检查] L --> O[输出结果] M --> O N --> O O --> P[保存或使用] style A fill:#e1f5e1 style P fill:#e1f5ff style C fill:#fff5e1 style K fill:#fff5e1
图表讲解:这个流程图展示了OCR从图像输入到文字输出的完整处理流程,每个步骤都对最终识别效果有重要影响。
获取图像(绿色区域):这是OCR的起点,图像来源包括文件读取、屏幕截图、扫描仪输入、相机拍摄等。图像质量直接影响后续识别效果,因此应尽可能获取清晰、高分辨率的图像。
图像预处理(黄色决策):原始图像往往需要预处理才能获得好的识别效果。根据图像特点选择合适的处理方式(黄色决策):灰度化将彩色图像转换为灰度,减少计算复杂度;降噪去除图像中的噪点,使用中值滤波、高斯滤波等方法;二值化将灰度图像转换为黑白两色,突出文字轮廓;旋转校正图像的倾斜角度,使文字水平对齐。
增强处理(蓝色区域):对预处理后的图像进行进一步增强。调整对比度使文字和背景区分更明显,调整亮度使图像层次更清晰。这些增强操作根据实际图像质量选择使用。
OCR引擎识别(绿色区域):Tesseract等OCR引擎分析图像,识别字符边界,提取特征,匹配字符库,最终输出文字内容。识别过程包含字符定位、字符分割、字符识别、语言分析等多个阶段。
后处理(黄色决策):原始OCR结果往往需要后处理才能满足实际需求。置信度过滤(黄色决策)去除识别不确定的文字(低于阈值);正则提取从长文本中提取特定模式(如日期、电话、邮箱);文本修正使用拼写检查、词典查找等技术修正识别错误。
输出结果(绿色区域):将处理后的文字保存到文件或直接使用。保存格式包括纯文本、结构化数据(JSON、XML)、带坐标信息的标注数据等。
理解这个流程,可以根据具体应用场景优化各个环节。比如对扫描文档重点做二值化,对照片重点做降噪,对验证码识别重点做字符分割。优化后的OCR准确率会显著提高。
四、键盘鼠标自动化:RPA机器人流程自动化
4.1 PyAutoGUI基础
PyAutoGUI是Python中最流行的键盘鼠标自动化库,可以实现RPA(机器人流程自动化)。
import pyautogui
import time
# 获取屏幕尺寸
screen_width, screen_height = pyautogui.size()
print(f"屏幕尺寸: {screen_width}x{screen_height}")
# 获取鼠标位置
current_x, current_y = pyautogui.position()
print(f"当前鼠标位置: ({current_x}, {current_y})")
# 移动鼠标
pyautogui.moveTo(100, 100, duration=0.5) # 移动到(100,100),耗时0.5秒
pyautogui.moveRel(50, 0, duration=0.3) # 相对移动右移50像素
# 点击鼠标
pyautogui.click(x=200, y=200) # 在指定位置点击左键
pyautogui.rightClick(x=200, y=200) # 右键点击
pyautogui.doubleClick(x=200, y=200) # 双击
pyautogui.middleClick(x=200, y=200) # 中键点击
# 拖拽
pyautogui.dragTo(300, 300, duration=1) # 拖拽到指定位置
pyautogui.dragRel(100, 0, duration=0.5) # 相对拖拽
# 滚动鼠标滚轮
pyautogui.scroll(10) # 向上滚动10个单位
pyautogui.scroll(-10) # 向下滚动10个单位4.2 键盘自动化
import pyautogui
import time
# 输入文字
pyautogui.click(500, 300) # 先点击输入框
pyautogui.write('Hello World!', interval=0.05) # 每个字符间隔0.05秒
# 特殊按键
pyautogui.press('enter') # 按下回车
pyautogui.press('tab') # 按下Tab
pyautogui.press('space') # 按下空格
# 组合键
pyautogui.hotkey('ctrl', 'c') # 复制 (Ctrl+C)
pyautogui.hotkey('ctrl', 'v') # 粘贴 (Ctrl+V)
pyautogui.hotkey('ctrl', 'a') # 全选 (Ctrl+A)
pyautogui.hotkey('ctrl', 's') # 保存 (Ctrl+S)
# 输入带特殊字符的文本
pyautogui.write('Hello\nWorld') # \n会被识别为回车
pyautogui.write('Hello', pause=0.1) # 每个字符暂停0.1秒
# 按键序列
pyautogui.press(['shift', 'a', 'b', 'c']) # Shift+A, B, C
pyautogui.keyUp('shift') # 释放Shift键4.3 截图和图像识别
import pyautogui
import time
# 截取整个屏幕
screenshot = pyautogui.screenshot()
screenshot.save('full_screen.png')
# 截取指定区域
region_screenshot = pyautogui.screenshot(region=(100, 100, 400, 300))
region_screenshot.save('region.png')
# 查找图像位置
button_location = pyautogui.locateOnScreen('button.png')
if button_location:
print(f"找到按钮: {button_location}")
center = pyautogui.center(button_location)
pyautogui.click(center)
else:
print("未找到按钮")
# 查找所有匹配图像
all_matches = pyautogui.locateAllOnScreen('icon.png')
for match in all_matches:
center = pyautogui.center(match)
print(f"找到图标: {center}")
# 等待图像出现
try:
location = pyautogui.locateOnScreen('confirm_button.png', confidence=0.9)
if location:
center = pyautogui.center(location)
pyautogui.click(center)
except pyautogui.ImageNotFoundException:
print("未找到确认按钮")4.4 安全机制
import pyautogui
# 设置故障保护
pyautogui.FAILSAFE = True # 启用(默认)
# 如果鼠标移动到屏幕左上角,会触发FailSafeException
# 暂停机制
pyautogui.PAUSE = 0.5 # 每个PyAutoGUI调用后暂停0.5秒
# 安全的自动化函数
def safe_automation():
"""带安全检查的自动化"""
try:
# 移动鼠标到安全位置开始
pyautogui.moveTo(screen_width / 2, screen_height / 2)
# 执行自动化操作
for i in range(10):
# 检查是否需要中断
if pyautogui.position()[0] < 10 and pyautogui.position()[1] < 10:
print("触发安全中断")
break
# 执行操作
pyautogui.click(100 + i * 50, 200)
time.sleep(0.5)
except pyautogui.FailSafeException:
print("FailSafe触发,操作已中止")下面的序列图展示了RPA自动化的执行流程:
sequenceDiagram participant Script as 自动化脚本 participant GUI as PyAutoGUI participant Screen as 屏幕显示 participant App as 目标应用 Note over Script,App: 启动自动化 Script->>GUI: 查找目标图像 GUI->>Screen: 截取屏幕 Screen-->>GUI: 返回屏幕截图 GUI-->>Script: 返回图像位置 alt 找到目标 Script->>GUI: 移动鼠标到目标 GUI->>Screen: 更新鼠标位置 Script->>GUI: 点击鼠标 GUI->>App: 发送点击事件 App->>Screen: 更新界面显示 Note over Script,App: 等待应用响应 Script->>GUI: 获取新截图 GUI->>Screen: 截取屏幕 Screen-->>GUI: 返回截图 Script->>Script: 验证操作结果 alt 验证成功 Script->>GUI: 输入文字 GUI->>App: 发送键盘事件 else 验证失败 Script->>Script: 记录错误或重试 end else 未找到目标 Script->>Script: 记录失败 end Note over Script,App: 继续或结束
图表讲解:这个序列图展示了RPA自动化的典型执行流程,包含图像识别、鼠标操作、键盘输入、结果验证等环节。
启动自动化阶段:脚本首先调用PyAutoGUI查找目标图像(如按钮、图标)。PyAutoGUI截取当前屏幕,与目标图像进行匹配比较,返回找到的位置坐标。如果找不到目标,脚本记录失败并可能重试或报错。
如果找到目标(蓝色区域):脚本移动鼠标到目标位置,PyAutoGUI更新屏幕上的鼠标指针。然后执行鼠标点击,PyAutoGUI向操作系统发送点击事件,操作系统将事件传递给目标应用。目标应用处理点击事件,可能打开菜单、激活按钮等操作,界面发生变化。
等待应用响应(黄色区域):由于应用处理需要时间,脚本需要等待一段时间。这个延迟可以通过固定时间、等待特定图像出现、等待特定文字出现等方式实现。
获取新截图验证结果:脚本再次截取屏幕,检查操作是否成功。例如,点击”保存”按钮后,检查是否出现”保存成功”的提示。验证可以基于图像匹配或文字识别。
如果验证成功(绿色区域):继续下一步操作,如输入文字、点击下一个按钮等。PyAutoGUI模拟键盘输入,发送按键事件给目标应用,应用接收输入并更新界面。
如果验证失败(红色区域):记录错误、尝试重试、或中止流程。这是RPA健壮性的重要部分,确保在意外情况下能够优雅处理而不是卡死。
理解这个流程,可以设计可靠的RPA自动化脚本,处理各种异常情况,实现真正的”无人值守”自动化。但要注意,RPA依赖于界面稳定性,如果目标应用界面变化频繁,维护成本会很高。
五、实战案例:自动化备份系统
5.1 项目概述
构建一个完整的自动化备份系统,具有以下功能:
- 定时扫描指定目录
- 检测新增和修改的文件
- 自动备份到目标位置
- 发送完成通知
- 生成备份报告
5.2 完整实现
import os
import shutil
import hashlib
import json
from pathlib import Path
from datetime import datetime
import schedule
import time
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
class BackupSystem:
"""自动化备份系统"""
def __init__(self, config_file='backup_config.json'):
"""初始化备份系统"""
self.config = self._load_config(config_file)
self.source_dir = Path(self.config['source_dir'])
self.backup_dir = Path(self.config['backup_dir'])
self.backup_dir.mkdir(exist_ok=True)
# 数据库文件:记录已备份文件的哈希值
self.db_file = self.backup_dir / 'backup_database.json'
self.database = self._load_database()
def _load_config(self, config_file):
"""加载配置文件"""
default_config = {
'source_dir': './data',
'backup_dir': './backup',
'schedule': '02:00', # 每天凌晨2点
'email': {
'enabled': False,
'sender': '[email protected]',
'password': 'your_password',
'recipient': '[email protected]'
}
}
try:
with open(config_file, 'r') as f:
config = json.load(f)
default_config.update(config)
except FileNotFoundError:
# 创建默认配置文件
with open(config_file, 'w') as f:
json.dump(default_config, f, indent=2)
return default_config
def _load_database(self):
"""加载备份记录"""
if self.db_file.exists():
with open(self.db_file, 'r') as f:
return json.load(f)
return {}
def _save_database(self):
"""保存备份记录"""
with open(self.db_file, 'w') as f:
json.dump(self.database, f, indent=2)
def _calculate_hash(self, filepath):
"""计算文件的MD5哈希值"""
hash_md5 = hashlib.md5()
with open(filepath, 'rb') as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()
def _needs_backup(self, filepath):
"""判断文件是否需要备份"""
relative_path = str(filepath.relative_to(self.source_dir))
current_hash = self._calculate_hash(filepath)
if relative_path not in self.database:
return True, '新文件'
if self.database[relative_path]['hash'] != current_hash:
return True, '已修改'
return False, '无需备份'
def backup_file(self, source_file):
"""备份单个文件"""
relative_path = source_file.relative_to(self.source_dir)
backup_file = self.backup_dir / relative_path
# 创建目标目录
backup_file.parent.mkdir(parents=True, exist_ok=True)
# 复制文件
shutil.copy2(source_file, backup_file)
# 更新数据库
file_hash = self._calculate_hash(source_file)
relative_path_str = str(relative_path)
self.database[relative_path_str] = {
'hash': file_hash,
'backup_time': datetime.now().isoformat(),
'size': source_file.stat().st_size
}
return backup_file
def scan_and_backup(self):
"""扫描并备份文件"""
print(f"\n{'='*50}")
print(f"开始备份: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(f"{'='*50}\n")
backup_stats = {
'new': 0,
'modified': 0,
'skipped': 0,
'errors': 0,
'total_size': 0
}
# 扫描源目录
for filepath in self.source_dir.rglob('*'):
if filepath.is_file():
try:
needs_backup, reason = self._needs_backup(filepath)
if needs_backup:
print(f"备份 [{reason}]: {filepath}")
backup_file = self.backup_file(filepath)
backup_stats[reason == '新文件' and 'new' or 'modified'] += 1
backup_stats['total_size'] += filepath.stat().st_size
else:
backup_stats['skipped'] += 1
except Exception as e:
print(f"错误: {filepath} - {e}")
backup_stats['errors'] += 1
# 保存数据库
self._save_database()
# 打印统计
print(f"\n备份完成:")
print(f" 新文件: {backup_stats['new']}")
print(f" 已修改: {backup_stats['modified']}")
print(f" 已跳过: {backup_stats['skipped']}")
print(f" 错误: {backup_stats['errors']}")
print(f" 总大小: {backup_stats['total_size'] / 1024 / 1024:.2f} MB")
# 发送通知
if self.config['email']['enabled']:
self._send_notification(backup_stats)
return backup_stats
def _send_notification(self, stats):
"""发送备份完成通知"""
sender = self.config['email']['sender']
password = self.config['email']['password']
recipient = self.config['email']['recipient']
subject = f"备份完成 - {datetime.now().strftime('%Y-%m-%d %H:%M')}"
body = f"""
备份操作已完成。
统计信息:
- 新文件: {stats['new']}
- 已修改: {stats['modified']}
- 已跳过: {stats['skipped']}
- 错误: {stats['errors']}
- 总大小: {stats['total_size'] / 1024 / 1024:.2f} MB
备份时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
""".strip()
try:
msg = MIMEMultipart()
msg['From'] = sender
msg['To'] = recipient
msg['Subject'] = subject
msg.attach(MIMEText(body, 'plain'))
server = smtplib.SMTP('smtp.gmail.com', 587)
server.starttls()
server.login(sender, password)
server.send_message(msg)
server.quit()
print("通知邮件已发送")
except Exception as e:
print(f"发送邮件失败: {e}")
def start(self):
"""启动定时备份"""
schedule_time = self.config['schedule']
schedule.every().day.at(schedule_time).do(self.scan_and_backup)
print(f"备份系统已启动")
print(f"计划时间: 每天 {schedule_time}")
print(f"源目录: {self.source_dir}")
print(f"备份目录: {self.backup_dir}")
print("按Ctrl+C停止\n")
try:
while True:
schedule.run_pending()
time.sleep(60)
except KeyboardInterrupt:
print("\n备份系统已停止")
# 使用示例
if __name__ == "__main__":
import sys
# 如果有命令行参数,立即执行一次备份
if len(sys.argv) > 1 and sys.argv[1] == '--now':
backup = BackupSystem()
backup.scan_and_backup()
else:
# 否则启动定时备份
backup = BackupSystem()
backup.start()下面的流程图展示了自动化备份系统的完整工作流程:
flowchart TD A[定时任务触发] --> B[扫描源目录] B --> C[遍历所有文件] C --> D{还有文件?} D -->|否| M[保存数据库] D -->|是| E[计算文件哈希] E --> F{检查是否变化} F -->|新文件| G[标记为新文件] F -->|已修改| H[标记为已修改] F -->|无变化| I[跳过] G --> J[复制到备份目录] H --> J I --> K[增加跳过计数] J --> L[更新哈希记录] K --> C L --> C M --> N[生成统计报告] N --> O{启用邮件通知?} O -->|是| P[发送通知邮件] O -->|否| Q[输出到控制台] P --> Q Q --> R[备份完成] style A fill:#e1f5e1 style R fill:#e1f5ff style F fill:#fff5e1 style O fill:#fff5e1 style J fill:#cffcfc
图表讲解:这个流程图展示了自动化备份系统从触发到完成的完整流程,这是一个典型的定时文件同步系统。
定时任务触发(绿色区域):根据配置的时间表(如每天凌晨2点),备份任务被触发。使用schedule或APScheduler实现定时调度。
扫描源目录(黄色决策):遍历源目录下的所有文件,使用rglob('*')递归获取所有文件。对每个文件进行处理(黄色决策)。
计算文件哈希值(蓝色区域):使用MD5算法计算文件的哈希值。哈希值是文件内容的唯一标识,相同文件的哈希值相同,不同文件的哈希值几乎必然不同。通过与数据库中记录的哈希值比较,可以判断文件是否发生变化。
检查文件状态(黄色决策):如果文件在数据库中不存在,说明是新文件;如果哈希值不同,说明文件已被修改;如果哈希值相同,说明文件未变化,无需备份。
复制和更新(绿色区域):对于需要备份的文件,复制到备份目录,保持目录结构。同时更新数据库,记录新的哈希值、备份时间、文件大小等信息。对于无需备份的文件,增加跳过计数。
循环处理(黄色决策):返回继续处理下一个文件,直到所有文件处理完毕。
保存数据库(蓝色区域):将更新后的数据库保存到磁盘,确保下次备份时能正确识别文件变化。数据库使用JSON格式存储,便于查看和调试。
生成统计报告(紫色区域):汇总本次备份的统计信息——新文件数、修改文件数、跳过文件数、错误数、总大小等。这些统计信息用于监控备份效果。
发送通知(黄色决策):如果启用了邮件通知,发送备份完成邮件。邮件包含统计信息和备份时间,让管理员了解备份状态。如果没有启用,直接输出到控制台。
备份完成(绿色区域):本次备份任务结束,等待下次定时触发。
理解这个流程,可以构建各种文件同步和备份系统,支持增量备份、差异备份、版本管理等高级特性。这种自动化系统是数据安全的重要保障。
六、系列总结:走向自动化之路
6.1 知识体系回顾
经过8篇文章的学习,我们已经建立了完整的Python自动化知识体系:
第1篇 - Python基础与编程入门
- 变量、数据类型、运算符
- 输入输出操作
- 基础编程练习
第2篇 - 流程控制与代码结构
- 条件判断与循环
- 异常处理
- 函数基础
第3篇 - 函数与模块化编程
- 函数设计与应用
- 参数传递机制
- 模块与包管理
第4篇 - 数据结构与数据处理
- 列表、字典、集合
- 字符串处理
- 正则表达式
第5篇 - 文件操作与数据存储
- 文件读写
- 路径处理
- 数据序列化
第6篇 - 网络数据抓取与处理
- HTTP请求
- HTML解析
- API调用
- 数据清洗
第7篇 - 办公文档自动化处理
- Excel操作
- Word文档
- PDF处理
- 邮件发送
第8篇 - 定时任务与系统自动化
- 任务调度
- 进程控制
- OCR识别
- RPA自动化
6.2 自动化项目的最佳实践
"""
自动化项目最佳实践
1. 模块化设计
- 功能分离,职责单一
- 配置与代码分离
- 可复用的组件
2. 错误处理
- 全面的异常捕获
- 详细的日志记录
- 优雅的失败处理
3. 安全性
- 敏感信息加密存储
- 输入验证
- 权限控制
4. 可维护性
- 清晰的代码结构
- 完善的文档
- 单元测试
5. 监控与通知
- 执行状态监控
- 异常告警
- 完成通知
"""6.3 持续学习路径
LEARNING_PATH = {
"基础": ["Python语法", "数据结构", "面向对象"],
"进阶": ["并发编程", "网络编程", "数据库"],
"自动化": ["Web自动化", "RPA", "CI/CD"],
"数据": ["数据分析", "机器学习", "数据可视化"],
"工具": ["Git", "Docker", "云服务"]
}
print("Python自动化学习之路没有终点,")
print("持续学习,不断进步!")七、核心概念总结
| 概念 | 定义 | 应用场景 | 注意事项 |
|---|---|---|---|
| schedule | 简单任务调度库 | 轻量级定时任务 | 程序需持续运行 |
| APScheduler | 高级任务调度器 | 企业级调度系统 | 支持持久化和分布式 |
| subprocess | 进程管理模块 | 执行系统命令 | 注意Shell注入风险 |
| Tesseract | OCR引擎 | 图片文字识别 | 预处理影响准确率 |
| PyAutoGUI | RPA自动化库 | 模拟键盘鼠标 | 依赖界面稳定性 |
| FailSafe | 安全保护机制 | 防止失控 | 鼠标移到角落触发 |
常见问题解答
Q1:定时任务在程序关闭后如何执行?
答:Python脚本程序关闭后,所有任务都会停止。要实现持久化的定时任务,需要使用系统级调度工具。
对于Windows系统,使用任务计划程序(Task Scheduler):
import sys
import subprocess
def create_windows_task(script_path, trigger_time):
"""创建Windows定时任务"""
task_name = "PythonBackupTask"
command = f'schtasks /create /tn {task_name} /tr "python {script_path}" /sc daily /st {trigger_time}'
subprocess.run(command, shell=True)
# 使用示例
create_windows_task('backup.py', '02:00')对于Linux/Mac系统,使用cron:
# crontab -e 编辑
# 0 2 * * * /usr/bin/python3 /path/to/backup.py另一个选择是使用持续运行的服务器或云函数(如AWS Lambda、阿里云函数计算),这些平台提供可靠的定时执行机制。
最佳实践是:开发环境用Python调度器,生产环境用系统调度工具或云服务。
Q2:如何处理OCR识别的准确率问题?
答:OCR准确率受多种因素影响,可以通过多方面优化。
图像质量是最重要的因素:确保图像清晰、光照充足、文字与背景对比明显。对于扫描文档,使用300DPI以上的分辨率;对于屏幕截图,确保字体清晰可读。
预处理是提高准确率的关键:根据图像特点选择合适的预处理方法。二值化阈值需要根据实际图像调整;倾斜校正可以显著提高识别效果;降噪和去模糊可以减少干扰。
def advanced_preprocessing(image):
"""高级预处理流程"""
# 1. 灰度化
gray = image.convert('L')
# 2. 自适应阈值二值化
import cv2
binary = cv2.adaptiveThreshold(
np.array(gray), 255,
cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
cv2.THRESH_BINARY, 11, 2
)
# 3. 去噪
denoised = cv2.fastNlMeansDenoising(binary)
# 4. 形态学操作
kernel = np.ones((2,2), np.uint8)
processed = cv2.morphologyEx(denoised, cv2.MORPH_CLOSE, kernel)
return Image.fromarray(processed)语言选择也很重要:使用正确的语言包(chi_sim简体中文、eng英文),对于混合语言文本使用chi_sim+eng。
后处理可以修正常见错误:使用词典检查、正则表达式验证、上下文分析等方法。
如果Tesseract效果不好,考虑商业OCR引擎(如ABBYY、百度OCR),它们通常有更高的准确率。
Q3:RPA脚本如何处理界面变化导致的失败?
答:界面变化是RPA的主要挑战,需要设计健壮的脚本。
首先使用多种识别策略:不仅依赖图像识别,还可以使用文字识别、控件位置、窗口标题等多种方式定位元素。如果一种方式失败,尝试备用方案。
def robust_click(button_image, button_text=None):
"""健壮的点击操作"""
# 方法1: 图像识别
try:
location = pyautogui.locateOnScreen(button_image, confidence=0.8)
if location:
center = pyautogui.center(location)
pyautogui.click(center)
return True
except:
pass
# 方法2: 文字识别
if button_text:
try:
text_location = locate_by_text(button_text)
if text_location:
pyautogui.click(text_location)
return True
except:
pass
# 方法3: 坐标点击(最后手段)
fallback_position = get_fallback_position()
pyautogui.click(fallback_position)
return True添加重试机制:操作失败后等待一段时间重试,最多重试N次。重试之间可以尝试不同的方法。
实现自愈能力:检测常见错误模式(如弹窗、加载中),自动处理。记录失败日志,分析失败原因,持续改进脚本。
定期维护脚本:定期检查脚本运行状态,更新目标图像,调整识别参数。RPA不是设置后不管的,需要持续维护。
对于变化频繁的应用,考虑使用API而不是RPA,或者推动应用方提供自动化接口。
Q4:如何保护自动化脚本中的敏感信息?
答:敏感信息保护是自动化项目的关键安全问题。
绝对禁止将密码硬编码在脚本中,这是最常见也是最危险的做法。即使是私有仓库,代码也可能被意外泄露。
使用环境变量存储敏感信息:
import os
EMAIL_PASSWORD = os.getenv('EMAIL_PASSWORD')
if not EMAIL_PASSWORD:
raise ValueError("EMAIL_PASSWORD环境变量未设置")使用配置管理工具:HashiCorp Vault、AWS Secrets Manager、Azure Key Vault等专门的安全存储方案。
import hvac
client = hvac.Client(url='https://vault.example.com')
client.auth.approle.login(role_id='ROLE_ID', secret_id='SECRET_ID')
secret = client.secrets.kv.v2.read_secret_version(path='email')
password = secret['data']['data']['password']加密配置文件:如果必须使用配置文件,加密存储,运行时解密。
from cryptography.fernet import Fernet
key = Fernet.generate_key()
cipher = Fernet(key)
# 加密
encrypted = cipher.encrypt(b'my_secret_password')
# 解密
decrypted = cipher.decrypt(encrypted)访问控制和审计:限制谁可以访问敏感信息,记录所有访问日志。定期轮换密钥和密码。
最小权限原则:自动化脚本只授予完成任务所需的最小权限,避免过度授权。
Q5:如何监控自动化任务的运行状态?
答:监控是确保自动化系统稳定运行的关键。
基础监控是日志记录:详细记录任务执行的每个步骤、输入输出、异常信息。使用Python的logging模块,可以设置不同的日志级别。
import logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('automation.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
def automated_task():
logger.info("任务开始")
try:
# 执行任务
logger.info("步骤1完成")
logger.info("步骤2完成")
logger.info("任务成功完成")
except Exception as e:
logger.error(f"任务失败: {e}", exc_info=True)高级监控是收集指标:记录执行次数、成功率、执行时长、资源使用等。使用Prometheus + Grafana等监控系统。
from prometheus_client import Counter, Histogram, start_http_server
# 定义指标
task_counter = Counter('task_executions_total', '任务执行总次数', ['status'])
task_duration = Histogram('task_duration_seconds', '任务执行时长')
def monitored_task():
start = time.time()
try:
# 执行任务
task_counter.labels(status='success').inc()
except Exception as e:
task_counter.labels(status='failure').inc()
raise
finally:
task_duration.observe(time.time() - start)
# 启动指标服务器
start_http_server(8000)告警是异常情况的及时通知:设置告警规则,当连续失败、执行超时、资源异常时发送告警。使用邮件、短信、钉钉、企业微信等渠道。
健康检查是提供简单的状态接口:定期调用健康检查接口,确认自动化系统是否正常运行。
from flask import Flask
app = Flask(__name__)
@app.route('/health')
def health():
# 检查各种依赖
checks = {
'database': check_database(),
'api': check_api(),
'disk': check_disk_space()
}
if all(checks.values()):
return {'status': 'healthy'}, 200
else:
return {'status': 'unhealthy', 'checks': checks}, 503完整的监控体系包括:日志记录、指标收集、告警通知、健康检查。这些措施共同保障自动化系统的稳定性和可靠性。
总结
本文作为本系列的最后一篇,全面介绍了定时任务调度和系统自动化的核心技术。我们学习了时间和定时任务的处理、进程控制与系统命令、OCR图像识别、RPA键盘鼠标自动化,以及构建完整自动化系统的实践方法。
通过本系列8篇文章的学习,你已经掌握了从Python基础到自动化应用的完整知识体系。自动化是提高效率、减少重复劳动的有效手段,但也是一项需要持续学习和实践的技能。
记住,自动化的最终目标是解放人力,让人能够专注于更有创造性的工作。在追求自动化的过程中,也要权衡开发成本和维护成本,选择合适的问题域进行自动化。
祝你在自动化之路上越走越远,用Python创造更大的价值!
系列完结