用Python自动化枯燥的工作 第6篇:网络数据抓取与处理
摘要
本文将带你深入理解从互联网获取和处理数据的核心技术,帮助你掌握网络数据抓取的能力。你将学到Web爬虫的基本原理、HTML解析与数据提取、API调用与数据获取、数据清洗与格式转换,以及实际爬虫项目的开发方法。
学习目标
阅读完本文后,你将能够:
- 爬虫基础:理解HTTP协议和网页结构,掌握requests库的使用方法
- HTML解析:能够使用BeautifulSoup解析HTML,提取所需的数据
- API调用:能够调用REST API获取数据,处理JSON响应
- 数据清洗:掌握数据清洗技巧,处理缺失值和格式问题
- 项目实战:能够构建完整的爬虫项目,遵守网络爬虫的道德规范
一、网络基础:理解HTTP协议
1.1 HTTP请求与响应
HTTP(HyperText Transfer Protocol)是互联网上应用最广泛的协议,理解HTTP是学习网络数据抓取的基础。
# HTTP请求的基本结构
"""
请求行:GET /index.html HTTP/1.1
请求头:
Host: www.example.com
User-Agent: Mozilla/5.0
Accept: text/html
响应行:HTTP/1.1 200 OK
响应头:
Content-Type: text/html
Content-Length: 1234
响应体:<html>...</html>
"""1.2 使用requests库
requests是Python中最流行的HTTP库,提供了简洁易用的API。
import requests
# 基本GET请求
response = requests.get('https://www.example.com')
# 检查请求状态
print(f"状态码: {response.status_code}")
print(f"成功: {response.ok}") # status_code < 400
# 获取响应内容
text = response.text # 文本内容
content = response.content # 字节内容
# 获取JSON数据
response = requests.get('https://api.example.com/data')
data = response.json()
# 带参数的请求
params = {'key1': 'value1', 'key2': 'value2'}
response = requests.get('https://api.example.com/search', params=params)
# 添加请求头
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)',
'Accept': 'text/html'
}
response = requests.get('https://www.example.com', headers=headers)下面的序列图展示了HTTP请求响应的完整过程:
sequenceDiagram participant Client as 爬虫程序 participant Server as Web服务器 Note over Client,Server: 1. 发起HTTP请求 Client->>Server: GET /page.html HTTP/1.1 Note right of Client: 请求头:<br/>Host: example.com<br/>User-Agent: Python Bot Note over Client,Server: 2. 服务器处理请求 Server->>Server: 验证请求<br/>查找资源<br/>准备响应 Note over Client,Server: 3. 返回HTTP响应 Server-->>Client: HTTP/1.1 200 OK Note left of Client: 响应头:<br/>Content-Type: text/html<br/>Content-Length: 1234 Server-->>Client: <html>...内容...</html> Note over Client,Server: 4. 处理响应数据 Client->>Client: 解析HTML<br/>提取数据<br/>保存结果
图表讲解:这个序列图展示了爬虫程序与Web服务器之间的标准交互流程,这是所有网络数据抓取的基础。
发起HTTP请求阶段(绿色区域):爬虫程序构造HTTP请求,包含请求行(指定方法和路径)、请求头(提供元信息如User-Agent)。请求通过互联网发送到Web服务器。requests.get()方法封装了这个过程,开发者只需指定URL和可选参数。
服务器处理请求阶段(蓝色区域):Web服务器接收请求,进行验证和权限检查,然后查找请求的资源(如HTML文件、JSON数据等)。如果资源存在,准备响应;如果不存在,准备错误响应。服务器还可能记录访问日志、执行业务逻辑。
返回HTTP响应阶段(黄色区域):服务器发送响应给客户端。响应包含状态码(200表示成功,404表示未找到,500表示服务器错误)、响应头(描述内容类型、长度等编码信息)、响应体(实际内容,如HTML文档、JSON数据)。
处理响应数据阶段(粉色区域):爬虫程序接收响应,首先检查状态码确保请求成功。然后解析响应内容(HTML、JSON等),使用XPath、CSS选择器或正则表达式提取所需数据。最后将数据保存到文件或数据库。
理解这个流程有助于调试爬虫问题:如果获取不到数据,可能是请求被拒绝(检查User-Agent、Cookie等)、响应解析失败(检查HTML结构变化)、或者触发了反爬机制(需要添加延迟、使用代理等)。
1.3 处理常见的HTTP状态码
import requests
def fetch_url(url):
"""获取URL内容,处理各种状态码"""
try:
response = requests.get(url, timeout=10)
if response.status_code == 200:
print("请求成功")
return response.text
elif response.status_code == 404:
print(f"错误:页面不存在 - {url}")
elif response.status_code == 403:
print("错误:访问被拒绝,可能需要登录或更换User-Agent")
elif response.status_code == 500:
print("错误:服务器内部错误")
else:
print(f"未处理的状态码: {response.status_code}")
return None
except requests.exceptions.Timeout:
print("错误:请求超时")
except requests.exceptions.ConnectionError:
print("错误:无法连接到服务器")
except Exception as e:
print(f"错误: {e}")
return None二、HTML解析:提取网页数据
2.1 HTML基础结构
HTML(HyperText Markup Language)是网页的标记语言,了解其结构是提取数据的基础。
<!DOCTYPE html>
<html>
<head>
<title>页面标题</title>
</head>
<body>
<h1>主标题</h1>
<div class="container">
<p id="intro">这是一段文字</p>
<ul>
<li class="item">项目1</li>
<li class="item">项目2</li>
<li class="item">项目3</li>
</ul>
<a href="https://example.com">链接</a>
</div>
</body>
</html>2.2 使用BeautifulSoup解析HTML
BeautifulSoup是Python中最流行的HTML解析库,能够将HTML文档转换为Python对象树。
from bs4 import BeautifulSoup
import requests
# 获取网页内容
url = 'https://example.com'
response = requests.get(url)
html_content = response.text
# 创建BeautifulSoup对象
soup = BeautifulSoup(html_content, 'html.parser')
# 基本查找方法
# 通过标签查找
title = soup.find('title') # 找到第一个title标签
print(f"页面标题: {title.text}")
# 通过class查找
items = soup.find_all('li', class_='item')
for item in items:
print(item.text)
# 通过id查找
intro = soup.find('p', id='intro')
print(intro.text)
# CSS选择器(更灵活)
container = soup.select_one('.container') # 第一个匹配的元素
all_items = soup.select('.item') # 所有匹配的元素
# 链式查找
links = soup.select('div.container a')
for link in links:
print(f"链接文本: {link.text}")
print(f"链接地址: {link.get('href')}")2.3 常用的数据提取模式
from bs4 import BeautifulSoup
def extract_page_data(html):
"""提取页面数据的常见模式"""
soup = BeautifulSoup(html, 'html.parser')
data = {}
# 提取文本
data['title'] = soup.find('h1').get_text(strip=True)
# 提取属性
image = soup.find('img')
data['image_url'] = image.get('src') if image else None
# 提取所有链接
data['links'] = [a.get('href') for a in soup.find_all('a') if a.get('href')]
# 提取表格数据
table = soup.find('table')
if table:
rows = []
for tr in table.find_all('tr'):
cells = [td.get_text(strip=True) for td in tr.find_all(['td', 'th'])]
rows.append(cells)
data['table'] = rows
# 提取带有特定属性的元素
special_divs = soup.find_all('div', attrs={'data-type': 'special'})
data['special_items'] = [div.get_text(strip=True) for div in special_divs]
return data下面的流程图展示了HTML解析和数据提取的完整流程:
flowchart TD A[获取HTML内容] --> B[创建BeautifulSoup对象] B --> C[解析HTML结构] C --> D{需要提取什么?} D -->|单一元素| E[使用find方法] D -->|多个元素| F[使用find_all方法] D -->|复杂选择| G[使用CSS选择器] E --> H[获取元素内容] F --> H G --> H H --> I{需要什么信息?} I -->|文本内容| J[使用.text或.get_text] I -->|属性值| K[使用.get方法] I -->|HTML| L[使用.decode_contents] J --> M[清洗和格式化数据] K --> M L --> M M --> N[保存到数据结构] N --> O[输出或存储] style A fill:#e1f5e1 style O fill:#e1f5ff style D fill:#fff5e1 style I fill:#fff5e1
图表讲解:这个流程图展示了HTML解析和数据提取的完整过程,帮助理解如何从HTML中获取所需数据。
流程从获取HTML内容开始(绿色区域):这通常通过requests.get()完成,得到包含网页源代码的字符串。然后创建BeautifulSoup对象并指定解析器('html.parser'是Python内置的),BeautifulSoup将HTML字符串解析为可操作的Python对象树。
解析完成后进入选择阶段(黄色决策):需要提取什么类型的元素?如果只需要第一个匹配的元素,使用find()方法;如果需要所有匹配的元素,使用find_all()方法;如果选择条件复杂(如同时满足多个条件、嵌套选择),使用CSS选择器(select()方法),支持.class、#id、tag.class等语法。
选择到元素后,进入信息提取阶段(第二个黄色决策):需要什么类型的信息?文本内容使用.text或.get_text()方法获取;属性值(如href、src)使用.get()方法;原始HTML使用.decode_contents()方法。.get_text()可以添加strip=True参数去除多余空白。
提取的信息需要清洗和格式化(蓝色区域):去除多余空白、转换数据类型、处理缺失值等。然后将清洗后的数据保存到Python数据结构(列表、字典等),最后输出到控制台或保存到文件。
这个流程是HTML解析的标准模式,理解它可以根据不同的网页结构灵活调整提取策略。
三、API调用:获取结构化数据
3.1 REST API基础
API(Application Programming Interface)允许程序以结构化的方式访问数据。REST(Representational State Transfer)是最常见的API架构风格。
import requests
import json
# API基础URL
base_url = 'https://api.example.com'
# GET请求 - 获取数据
response = requests.get(f'{base_url}/users')
if response.status_code == 200:
users = response.json()
print(f"获取到 {len(users)} 个用户")
# 带查询参数的GET请求
params = {
'page': 1,
'per_page': 20,
'sort': 'name'
}
response = requests.get(f'{base_url}/users', params=params)
# POST请求 - 创建数据
new_user = {
'name': '张三',
'email': '[email protected]'
}
response = requests.post(f'{base_url}/users', json=new_user)
# PUT请求 - 更新数据
updated_user = {'name': '李四'}
response = requests.put(f'{base_url}/users/1', json=updated_user)
# DELETE请求 - 删除数据
response = requests.delete(f'{base_url}/users/1')3.2 处理API响应
import requests
from typing import Optional, Dict, Any
class APIClient:
"""简单的API客户端"""
def __init__(self, base_url: str, api_key: Optional[str] = None):
self.base_url = base_url
self.headers = {}
if api_key:
self.headers['Authorization'] = f'Bearer {api_key}'
def get(self, endpoint: str, params: Optional[Dict] = None) -> Dict[str, Any]:
"""GET请求"""
url = f'{self.base_url}{endpoint}'
response = requests.get(url, headers=self.headers, params=params)
if response.status_code == 200:
return response.json()
else:
raise Exception(f"API请求失败: {response.status_code}")
def post(self, endpoint: str, data: Dict[str, Any]) -> Dict[str, Any]:
"""POST请求"""
url = f'{self.base_url}{endpoint}'
response = requests.post(url, headers=self.headers, json=data)
if response.status_code in [200, 201]:
return response.json()
else:
raise Exception(f"API请求失败: {response.status_code}")
# 使用示例
client = APIClient('https://api.github.com')
# 获取用户信息
try:
user_info = client.get('/users/octocat')
print(f"用户: {user_info['name']}")
print(f"仓库数: {user_info['public_repos']}")
except Exception as e:
print(f"错误: {e}")3.3 处理分页数据
import requests
import time
def fetch_all_data(base_url, params=None, max_pages=10):
"""获取所有分页数据"""
all_data = []
page = 1
while page <= max_pages:
# 构造请求参数
request_params = (params or {}).copy()
request_params['page'] = page
request_params['per_page'] = 100
try:
response = requests.get(base_url, params=request_params)
if response.status_code != 200:
print(f"请求失败: {response.status_code}")
break
data = response.json()
# 检查是否还有数据
if not data:
break
all_data.extend(data)
print(f"获取第 {page} 页,累计 {len(all_data)} 条")
page += 1
# 添加延迟避免请求过快
time.sleep(0.5)
except Exception as e:
print(f"发生错误: {e}")
break
return all_data
# 使用示例
# url = 'https://api.github.com/users/python/repos'
# repos = fetch_all_data(url)
# print(f"共获取 {len(repos)} 个仓库")下面的序列图展示了API调用的完整流程,包括认证、分页和错误处理:
sequenceDiagram participant Client as 爬虫程序 participant API as API服务器 participant DB as 数据库 Note over Client,DB: 第1页请求 Client->>API: GET /users?page=1<br/>Authorization: Bearer token API->>API: 验证令牌<br/>查询数据库 API->>DB: SELECT * FROM users<br/>LIMIT 100 OFFSET 0 DB-->>API: 返回用户数据 API-->>Client: HTTP 200 OK<br/>{"users": [...], "has_more": true} Note over Client,DB: 处理第1页数据 Client->>Client: 解析JSON<br/>提取数据<br/>保存结果 Note over Client,DB: 第2页请求 Client->>API: GET /users?page=2<br/>Authorization: Bearer token API->>DB: SELECT * FROM users<br/>LIMIT 100 OFFSET 100 DB-->>API: 返回用户数据 API-->>Client: HTTP 200 OK<br/>{"users": [...], "has_more": true} Note over Client,DB: 重复直到has_more=false Client->>API: GET /users?page=N API-->>Client: HTTP 200 OK<br/>{"users": [...], "has_more": false} Note over Client,DB: 完成 Client->>Client: 合并所有数据<br/>输出或存储
图表讲解:这个序列图展示了分页API调用的完整流程,这是获取大量数据的标准模式。
首先发起第1页请求(绿色区域):爬虫程序构造GET请求,包含认证信息(如Bearer token)和分页参数(page=1)。API服务器验证令牌的有效性,然后从数据库查询数据。数据库执行带偏移量的查询(LIMIT 100 OFFSET 0),返回第一页的100条记录。API将记录打包成JSON响应返回,同时包含元数据如has_more表示是否还有更多数据。
客户端处理第1页数据(蓝色区域):解析JSON响应,提取用户数据列表,保存到内存或文件。然后检查has_more字段,如果为true,继续请求下一页。
第2页请求与第1页类似,但参数变为page=2,数据库查询的偏移量变为OFFSET 100,跳过已获取的100条记录,返回接下来的100条。
这个过程重复进行(黄色循环),每次页码加1,偏移量增加100,直到API返回has_more=false或达到预设的最大页数限制。
最后(粉色区域):客户端合并所有页的数据,得到完整的记录集合,然后输出或存储。这个模式适用于大多数分页API,只需要根据具体API调整分页参数名(page/offset等)和响应格式。
理解这个流程,可以编写通用的分页数据获取函数,自动处理所有分页逻辑,简化调用代码。
四、数据清洗:确保数据质量
4.1 数据清洗的重要性
从网页或API获取的数据往往需要清洗才能使用。常见问题包括:缺失值、重复数据、格式不一致、异常值等。
4.2 文本数据清洗
import re
def clean_text(text):
"""清洗文本数据"""
if not isinstance(text, str):
return str(text)
# 去除多余空白
text = ' '.join(text.split())
# 去除特殊字符(保留中文、字母、数字、常用标点)
text = re.sub(r'[^\u4e00-\u9fa5a-zA-Z0-9\s,。!?、;:""''()]', '', text)
# 统一全角半角标点
text = text.replace(',', ',').replace('。', '.')
text = text.replace('!', '!').replace('?', '?')
return text.strip()
# 批量清洗
texts = [
" 这是 一段 有 多余 空白 的 文本 ",
"这段文本有特殊字符@#$%^&*",
"全角标点,应该转换。"
]
cleaned = [clean_text(t) for t in texts]
for original, clean in zip(texts, cleaned):
print(f"原始: {original}")
print(f"清洗: {clean}\n")4.3 数值数据清洗
import re
from typing import Optional, Union
def clean_number(value) -> Optional[float]:
"""清洗数值数据"""
if value is None:
return None
if isinstance(value, (int, float)):
return float(value)
if isinstance(value, str):
# 去除非数字字符(保留负号、小数点)
cleaned = re.sub(r'[^\d.-]', '', value)
if cleaned:
try:
return float(cleaned)
except ValueError:
return None
return None
def clean_price(price_str: str) -> Optional[float]:
"""清洗价格数据"""
if not price_str:
return None
# 移除货币符号和千位分隔符
price_str = price_str.replace('¥', '').replace('$', '').replace(',', '')
# 处理"万元"等单位
if '万' in price_str:
number = clean_number(price_str.replace('万', ''))
return number * 10000 if number else None
return clean_number(price_str)
# 使用示例
prices = ['¥1,234.56', '$2,345.67', '100万元', 'abc', '']
cleaned_prices = [clean_price(p) for p in prices]
print(cleaned_prices)4.4 处理缺失值
import pandas as pd
from typing import List, Dict, Any
def handle_missing_values(data: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""处理缺失值"""
cleaned = []
for item in data:
cleaned_item = {}
for key, value in item.items():
# 跳过None值
if value is None:
continue
# 跳过空字符串
if isinstance(value, str) and not value.strip():
continue
# 跳过空列表
if isinstance(value, list) and not value:
continue
cleaned_item[key] = value
cleaned.append(cleaned_item)
return cleaned
def fill_missing_values(data: List[Dict[str, Any]], defaults: Dict[str, Any]) -> List[Dict[str, Any]]:
"""用默认值填充缺失字段"""
filled = []
for item in data:
filled_item = defaults.copy()
filled_item.update(item)
filled.append(filled_item)
return filled
# 使用示例
data = [
{'name': '张三', 'age': 25, 'city': '北京'},
{'name': '李四', 'age': None}, # 缺少city,age为None
{'name': '', 'age': 30, 'city': '上海'}, # name为空字符串
]
# 方式1:移除缺失值
cleaned = handle_missing_values(data)
print("移除缺失值后:")
for item in cleaned:
print(item)
# 方式2:填充默认值
filled = fill_missing_values(data, {'name': '未知', 'age': 0, 'city': '未知'})
print("\n填充默认值后:")
for item in filled:
print(item)4.5 去重和验证
from typing import List, Dict, Any, Callable
def remove_duplicates(data: List[Dict[str, Any]], key: str) -> List[Dict[str, Any]]:
"""根据指定字段去重"""
seen = set()
unique = []
for item in data:
value = item.get(key)
if value not in seen:
seen.add(value)
unique.append(item)
return unique
def validate_data(data: List[Dict[str, Any]], validators: Dict[str, Callable]) -> List[Dict[str, Any]]:
"""验证数据"""
valid = []
for item in data:
is_valid = True
for field, validator in validators.items():
value = item.get(field)
if not validator(value):
print(f"验证失败: {field}={value}")
is_valid = False
break
if is_valid:
valid.append(item)
return valid
# 使用示例
data = [
{'id': 1, 'email': '[email protected]', 'age': 25},
{'id': 2, 'email': '[email protected]', 'age': 30},
{'id': 1, 'email': '[email protected]', 'age': 25}, # 重复
{'id': 3, 'email': 'invalid-email', 'age': -5}, # 无效数据
]
# 去重
unique_data = remove_duplicates(data, 'id')
print(f"去重后: {len(unique_data)} 条")
# 验证
validators = {
'email': lambda x: '@' in str(x) if x else False,
'age': lambda x: isinstance(x, int) and 0 < x < 150
}
valid_data = validate_data(unique_data, validators)
print(f"验证后: {len(valid_data)} 条")下面的流程图展示了数据清洗的完整流程:
flowchart TD A[获取原始数据] --> B[数据加载和检查] B --> C{数据质量如何?} C -->|有缺失值| D[处理缺失值] C -->|有重复| E[去重] C -->|格式问题| F[格式转换] D --> G{处理方式} G -->|删除| H[移除缺失记录] G -->|填充| I[使用默认值] E --> J[识别重复记录] J --> K[保留首次出现] F --> L[文本规范化] F --> M[数值转换] F --> N[日期格式化] H --> O[合并清洗结果] I --> O K --> O L --> O M --> O N --> O O --> P[验证数据质量] P --> Q{验证通过?} Q -->|否| R[记录问题数据] Q -->|是| S[数据可用] R --> T[修正或排除] T --> S S --> U[保存清洗后数据] style A fill:#e1f5e1 style U fill:#e1f5ff style C fill:#fff5e1 style G fill:#fff5e1 style Q fill:#fff5e1
图表讲解:这个流程图展示了数据清洗的完整流程,包含了各种数据质量问题的处理方法。
流程从获取原始数据开始(绿色区域):首先进行数据加载和初步检查,了解数据的基本情况,如字段类型、缺失值比例、数据范围等。
然后进行数据质量评估(第一个黄色决策):根据发现的问题类型选择不同的处理路径。
如果存在缺失值(蓝色区域):需要决定处理方式。可以选择删除包含缺失值的记录(简单但可能丢失信息),或使用填充策略(保留记录但可能引入偏差)。填充策略包括使用默认值、均值、中位数、前向/后向填充等。
如果存在重复记录(粉色区域):需要识别重复的标准(如某个字段完全相同,或组合字段相同)。去重时通常保留首次出现的记录,删除后续重复。
如果存在格式问题(紫色区域):需要根据数据类型进行转换。文本规范化包括统一大小写、去除空白、特殊字符处理等;数值转换包括去除非数字字符、处理单位(如”万元”)、类型转换等;日期格式化包括识别各种日期格式、统一为标准格式。
清洗后的数据需要合并(蓝色区域),将各种处理的结果整合。
然后进行数据验证(第二个黄色决策):检查数据是否符合预期,如值域是否合理、格式是否正确、业务规则是否满足。如果验证失败,记录问题数据,决定修正或排除。如果验证通过,数据可以投入使用(绿色区域)。
最后保存清洗后的数据(绿色区域),以便后续分析和使用。理解这个流程可以系统化地处理各种数据质量问题。
五、实战案例:构建新闻爬虫
5.1 项目概述
构建一个新闻网站爬虫,能够:
- 抓取新闻列表
- 提取新闻详情
- 保存为结构化数据
- 支持增量更新
5.2 完整实现
import requests
from bs4 import BeautifulSoup
from typing import List, Dict, Any, Optional
from datetime import datetime
from pathlib import Path
import json
import time
class NewsCrawler:
"""新闻爬虫类"""
def __init__(self, base_url: str, output_dir: str = 'news_data'):
self.base_url = base_url
self.output_dir = Path(output_dir)
self.output_dir.mkdir(exist_ok=True)
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
})
def fetch_news_list(self, page: int = 1) -> List[Dict[str, str]]:
"""获取新闻列表"""
url = f"{self.base_url}/news?page={page}"
try:
response = self.session.get(url, timeout=10)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
news_items = []
for item in soup.select('.news-item'):
title_elem = item.select_one('.title a')
link_elem = item.select_one('.title a')
date_elem = item.select_one('.date')
summary_elem = item.select_one('.summary')
if title_elem and link_elem:
news_items.append({
'title': title_elem.get_text(strip=True),
'link': link_elem.get('href'),
'date': date_elem.get_text(strip=True) if date_elem else '',
'summary': summary_elem.get_text(strip=True) if summary_elem else ''
})
return news_items
except Exception as e:
print(f"获取新闻列表失败: {e}")
return []
def fetch_news_detail(self, news_url: str) -> Optional[Dict[str, Any]]:
"""获取新闻详情"""
if not news_url.startswith('http'):
news_url = f"{self.base_url}{news_url}"
try:
response = self.session.get(news_url, timeout=10)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
# 提取标题
title = soup.select_one('.article-title')
title = title.get_text(strip=True) if title else ''
# 提取内容
content_elem = soup.select_one('.article-content')
content = ''
if content_elem:
paragraphs = content_elem.find_all('p')
content = '\n\n'.join([p.get_text(strip=True) for p in paragraphs])
# 提取元数据
author = soup.select_one('.author')
author = author.get_text(strip=True) if author else ''
publish_time = soup.select_one('.publish-time')
publish_time = publish_time.get_text(strip=True) if publish_time else ''
# 提取图片
images = []
for img in soup.select('.article-content img'):
img_url = img.get('src') or img.get('data-src')
if img_url:
images.append(img_url)
return {
'title': title,
'content': content,
'author': author,
'publish_time': publish_time,
'images': images,
'url': news_url,
'crawled_at': datetime.now().isoformat()
}
except Exception as e:
print(f"获取新闻详情失败 ({news_url}): {e}")
return None
def save_news(self, news_data: Dict[str, Any]):
"""保存新闻数据"""
# 从URL生成文件名
filename = news_data['url'].split('/')[-1] or str(hash(news_data['url']))
filepath = self.output_dir / f"{filename}.json"
with open(filepath, 'w', encoding='utf-8') as f:
json.dump(news_data, f, ensure_ascii=False, indent=2)
def crawl(self, max_pages: int = 5, delay: float = 1.0):
"""执行爬取"""
all_news = []
for page in range(1, max_pages + 1):
print(f"正在获取第 {page} 页...")
news_list = self.fetch_news_list(page)
if not news_list:
print("没有更多新闻")
break
for item in news_list:
print(f" 抓取: {item['title']}")
detail = self.fetch_news_detail(item['link'])
if detail:
self.save_news(detail)
all_news.append(detail)
time.sleep(delay)
print(f"\n爬取完成!共获取 {len(all_news)} 条新闻")
return all_news
# 使用示例
if __name__ == "__main__":
crawler = NewsCrawler('https://example-news-site.com')
news = crawler.crawl(max_pages=3, delay=0.5)5.3 增量更新功能
import hashlib
class IncrementalNewsCrawler(NewsCrawler):
"""支持增量更新的新闻爬虫"""
def __init__(self, base_url: str, output_dir: str = 'news_data'):
super().__init__(base_url, output_dir)
self.crawled_urls = self._load_crawled_urls()
def _load_crawled_urls(self) -> set:
"""加载已爬取的URL"""
crawled_file = self.output_dir / 'crawled_urls.txt'
if crawled_file.exists():
with open(crawled_file, 'r', encoding='utf-8') as f:
return set(line.strip() for line in f)
return set()
def _save_crawled_url(self, url: str):
"""保存已爬取的URL"""
crawled_file = self.output_dir / 'crawled_urls.txt'
with open(crawled_file, 'a', encoding='utf-8') as f:
f.write(f"{url}\n")
self.crawled_urls.add(url)
def fetch_news_detail(self, news_url: str) -> Optional[Dict[str, Any]]:
"""获取新闻详情(跳过已爬取)"""
if news_url in self.crawled_urls:
print(f" 跳过(已爬取): {news_url}")
return None
detail = super().fetch_news_detail(news_url)
if detail:
self._save_crawled_url(news_url)
return detail
def crawl_new_only(self, max_pages: int = 5):
"""只爬取新内容"""
return self.crawl(max_pages)下面的流程图展示了新闻爬虫的完整工作流程:
flowchart TD A[开始爬取] --> B[初始化爬虫] B --> C[设置页码为1] C --> D[获取新闻列表页] D --> E{列表页成功?} E -->|否| F[检查重试次数] E -->|是| G[解析新闻链接] F --> H{达到最大重试?} H -->|是| I[结束爬取] H -->|否| D G --> J[遍历新闻链接] J --> K{还有链接?} K -->|否| L[页码+1] K -->|是| M[检查是否已爬取] M --> N{已爬取?} N -->|是| O[跳过此链接] N -->|否| P[获取详情页] P --> Q{详情页成功?} Q -->|否| O Q -->|是| R[解析新闻内容] R --> S[提取标题、正文、图片] S --> T[保存为JSON] T --> U[记录已爬取URL] U --> V[添加延迟] V --> O O --> K L --> W{达到最大页数?} W -->|是| I W -->|否| D I --> X[生成统计报告] X --> Y[输出结果] style A fill:#e1f5e1 style Y fill:#e1f5ff style E fill:#fff5e1 style H fill:#fff5e1 style N fill:#fff5e1 style Q fill:#fff5e1 style W fill:#fff5e1
图表讲解:这个流程图展示了新闻爬虫从初始化到完成报告的完整工作流程,包含了错误处理和增量更新机制。
爬取开始(绿色区域):初始化爬虫对象,设置基础URL、输出目录、请求头等。设置页码从1开始。
进入主循环:获取新闻列表页(第一个黄色决策)。如果请求失败,检查重试次数(黄色决策),如果达到最大重试次数则结束爬取(红色区域),否则重试。
如果列表页成功(蓝色区域):解析HTML,提取每条新闻的标题、链接、日期、摘要等信息。然后遍历所有新闻链接(黄色决策),对每条链接检查是否已爬取(黄色决策)。如果已爬取则跳过,这是增量更新机制的核心。
如果未爬取(粉色区域):获取新闻详情页。请求可能失败(黄色决策),失败则跳过该链接;成功则解析详情页HTML,提取标题、正文、作者、发布时间、图片等完整内容。
提取完成后(绿色区域):将数据保存为JSON文件,使用URL的哈希值作为文件名避免冲突。记录URL到已爬取集合,支持增量更新。添加延迟避免请求过快触发反爬机制(黄色决策)。
然后处理下一条链接,直到当前页所有链接处理完毕。页码加1,检查是否达到最大页数限制(黄色决策),如果达到则结束爬取(红色区域),否则继续下一页。
最终(蓝色区域):生成统计报告(如成功抓取数量、失败数量、总耗时等),输出结果或发送通知。
这个流程展示了健壮爬虫的完整设计:错误处理和重试、增量更新避免重复爬取、延迟控制请求速率、完整的日志和统计。理解这个流程可以构建各种类型的网络爬虫。
六、核心概念总结
| 概念 | 定义 | 应用场景 | 注意事项 |
|---|---|---|---|
| HTTP协议 | 互联网数据传输协议 | 所有网络请求 | 理解请求头、状态码 |
| requests库 | Python HTTP库 | 发送网络请求 | 使用timeout避免阻塞 |
| BeautifulSoup | HTML解析库 | 提取网页数据 | 检查元素存在性 |
| CSS选择器 | 元素定位语法 | 精确选择元素 | .class、#id、tag |
| REST API | 结构化数据接口 | 获取JSON数据 | 处理分页和限流 |
| 数据清洗 | 数据质量处理 | 确保数据可用 | 处理缺失和异常 |
| 反爬机制 | 网站防护措施 | 绕过限制 | 遵守规则,控制频率 |
常见问题解答
Q1:网站反爬虫机制有哪些,如何应对?
答:常见的反爬虫机制包括User-Agent检测、IP限制、请求频率限制、验证码、登录要求等。
User-Agent检测是最基本的反爬措施,服务器检查请求头中的User-Agent字段,识别是否为浏览器。应对方法是设置常见的浏览器User-Agent:
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
}
response = requests.get(url, headers=headers)IP限制通过记录IP地址的请求次数,超过阈值就封禁。应对方法包括使用代理IP池、控制请求频率、分布式爬虫。请求频率要合理,通常每次请求间隔1-3秒,避免给服务器造成压力。
验证码用于区分人类和机器。应对方法包括使用验证码识别服务(OCR)、手动输入、或者寻找不需要验证码的数据源。对于简单验证码,可以使用tesseract-ocr等工具识别。
登录要求针对需要登录才能访问的内容。应对方法包括使用requests.Session保存Cookie、模拟登录流程、使用Selenium等浏览器自动化工具。
其他技术包括JavaScript渲染(需要用Selenium或Pyppeteer)、Cookie追踪(保存和传递Cookie)、HTTPS加密(处理SSL证书)。但最重要的道德原则是遵守robots.txt协议,不要对服务器造成过大压力。
# 示例:基本的反爬应对
import requests
import time
from fake_useragent import UserAgent
ua = UserAgent()
session = requests.Session()
def fetch_with_retry(url, max_retries=3):
for i in range(max_retries):
try:
headers = {'User-Agent': ua.random}
response = session.get(url, headers=headers, timeout=10)
time.sleep(2) # 控制频率
return response
except Exception as e:
if i == max_retries - 1:
raise
time.sleep(5)
return NoneQ2:爬虫的速度应该控制在什么范围?如何避免被封IP?
答:爬虫速度取决于目标网站的承受能力和数据需求,基本原则是”最小必要原则”。
速度控制有几个层次:请求间延迟、并发控制、峰值限制。请求间延迟是最基本的控制,通常每次请求间隔1-3秒,对友好爬虫可以更长。并发控制是限制同时进行的请求数量,通常不超过5个并发连接。峰值限制是在一定时间窗口内限制总请求数,如每分钟不超过20次。
避免被封IP的核心是模仿人类行为模式:使用真实浏览器的User-Agent、随机化请求间隔、偶尔的长暂停、避免固定的时间模式。使用Session保持连接复用,减少连接开销。
代理IP池是分散请求的有效方法:通过多个IP地址发送请求,单个IP的请求量被分散。但要注意代理的质量和成本,以及使用代理的合法性。
import time
import random
from concurrent.futures import ThreadPoolExecutor, as_completed
def crawl_url(url):
time.sleep(random.uniform(1, 3)) # 随机延迟
response = requests.get(url)
return response
urls = [...] # URL列表
# 控制并发数
with ThreadPoolExecutor(max_workers=3) as executor:
futures = [executor.submit(crawl_url, url) for url in urls]
for future in as_completed(futures):
result = future.result()最重要的道德考量:遵守robots.txt协议,只爬取公开数据,不绕过明确的访问限制,对服务器友好。爬虫应该是数据的合理获取工具,而不是攻击工具。
Q3:如何处理JavaScript渲染的网页?
答:JavaScript渲染的网页需要执行JavaScript代码才能看到完整内容,传统的requests+BeautifulSoup无法处理。
这类网页的加载过程:浏览器首先收到基本的HTML框架,然后执行JavaScript代码,通过AJAX请求获取数据,最后动态更新页面内容。使用requests只能获取初始HTML,看不到动态加载的内容。
解决方案包括:分析网络请求(直接调用数据API)、使用Selenium浏览器自动化、使用Pyppeteer无头浏览器、分析JavaScript逻辑。
最推荐的方法是分析网络请求:打开浏览器开发者工具的Network选项卡,加载页面,观察XHR/Fetch请求。通常能找到返回JSON数据的API接口,直接调用这个API比渲染整个页面高效得多。
# 示例:直接调用API获取数据
import requests
url = "https://example.com/api/news"
params = {'page': 1, 'limit': 20}
headers = {
'User-Agent': 'Mozilla/5.0...',
'Referer': 'https://example.com/news',
'X-Requested-With': 'XMLHttpRequest'
}
response = requests.get(url, params=params, headers=headers)
data = response.json()如果无法找到API,需要使用浏览器自动化工具。Selenium控制真实浏览器,支持各种浏览器,但资源消耗大。Pyppeteer基于无头Chrome,更轻量,适合服务器环境。
# 示例:使用Selenium
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
driver = webdriver.Chrome()
driver.get('https://example.com')
# 等待元素加载
wait = WebDriverWait(driver, 10)
element = wait.until(EC.presence_of_element_located((By.CLASS_NAME, 'content')))
# 获取渲染后的HTML
html = driver.page_source
driver.quit()选择原则:优先尝试直接调用API(最快、最省资源),其次使用Pyppeteer(无头、轻量),最后用Selenium(兼容性好但重)。根据具体网站的反爬强度和数据重要性选择合适方案。
Q4:如何设计可维护的爬虫代码结构?
答:可维护的爬虫代码应该模块化、配置化、易于扩展。
模块化设计将爬虫分解为独立的组件:请求模块(处理HTTP请求)、解析模块(提取数据)、存储模块(保存数据)、调度模块(控制流程)。每个模块有明确的职责,可以独立测试和修改。
配置化将硬编码的值提取为配置:目标URL、输出目录、请求延迟、重试次数等。配置可以放在文件(JSON/YAML)中,便于修改而不动代码。
# config.yaml
target:
base_url: "https://example.com"
start_page: 1
max_pages: 10
crawler:
delay: 2.0
timeout: 10
max_retries: 3
concurrent: 3
storage:
output_dir: "output"
format: "json"
# crawler.py
import yaml
class CrawlerConfig:
def __init__(self, config_file):
with open(config_file) as f:
self.config = yaml.safe_load(f)
@property
def base_url(self):
return self.config['target']['base_url']错误处理和日志记录是可维护性的关键:使用try-except捕获异常,记录详细的错误信息(URL、状态码、异常类型),避免单点失败导致整个爬虫停止。使用logging模块而不是print,可以控制日志级别和输出位置。
import logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('crawler.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
def fetch_url(url):
try:
response = requests.get(url, timeout=10)
logger.info(f"成功获取: {url}")
return response
except requests.Timeout:
logger.error(f"超时: {url}")
except Exception as e:
logger.exception(f"请求失败: {url}")数据验证确保存储的数据质量:定义数据模型或schema,验证必需字段、数据类型、值范围。不符合条件的数据记录到错误列表,不混入正常数据。
单元测试保证代码质量:为每个模块编写测试用例,mock网络请求,测试解析逻辑。CI/CD自动化测试确保代码修改不会引入bug。
文档和注释帮助维护:在代码中添加注释解释复杂逻辑,编写README说明项目结构、依赖安装、使用方法。维护问题日志记录遇到的问题和解决方案。
Q5:爬虫数据的存储方案应该如何选择?
答:存储方案取决于数据量、查询需求、更新频率等因素。
小规模数据(MB级别)适合文件存储:JSON格式可读性好,易于调试;CSV格式适合表格数据,Excel可直接打开;Pickle格式保留Python对象,但不可读。文件存储简单直接,但查询和更新不方便。
import json
from pathlib import Path
def save_to_json(data, filepath):
with open(filepath, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
def load_from_json(filepath):
with open(filepath, 'r', encoding='utf-8') as f:
return json.load(f)中等规模数据(GB级别)适合SQLite数据库:轻量级、无需服务器、支持SQL查询。适合单机爬虫,需要按条件查询或去重的场景。SQLite支持事务、索引,性能比文件好得多。
import sqlite3
def init_db(db_path):
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS news (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
url TEXT UNIQUE,
content TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
conn.commit()
return conn
def insert_news(conn, news_data):
cursor = conn.cursor()
try:
cursor.execute('''
INSERT INTO news (title, url, content)
VALUES (?, ?, ?)
''', (news_data['title'], news_data['url'], news_data['content']))
conn.commit()
except sqlite3.IntegrityError:
pass # URL重复大规模数据(TB级别)或高并发场景需要专业数据库:PostgreSQL适合复杂查询和数据分析,MongoDB适合非结构化数据,Elasticsearch适合全文搜索。但这些需要额外的基础设施和运维成本。
决策建议:首先评估数据规模和增长速度,然后考虑查询需求(简单查询用文件,复杂查询用数据库),考虑更新频率(频繁更新用数据库),考虑部署环境(本地用SQLite,云上用云数据库)。大多数爬虫项目从JSON/SQLite开始,需要时再升级。
对于长期项目,建议设计数据访问层抽象,底层存储可以替换而不影响上层逻辑。使用ORM(如SQLAlchemy)可以简化数据库操作,同时支持多种数据库后端。
总结
本文全面介绍了网络数据抓取与处理的核心技术。我们学习了HTTP协议的基础、requests库的使用、BeautifulSoup解析HTML、API调用获取数据、数据清洗确保质量,以及构建完整爬虫项目的实践方法。
网络数据抓取是自动化获取信息的重要手段,但必须遵守道德规范和法律边界。合理的爬虫应该对服务器友好、遵守robots.txt、尊重数据所有权。掌握这些技术,可以从互联网获取有价值的数据,为分析和决策提供支持。
下篇预告
下一篇我们将深入探讨办公文档自动化处理,带你了解如何自动化处理Excel表格、Word文档、PDF文件和邮件发送。你将学会使用Python处理各种办公文档,大幅提高工作效率。