用Python自动化枯燥的工作 第6篇:网络数据抓取与处理

摘要

本文将带你深入理解从互联网获取和处理数据的核心技术,帮助你掌握网络数据抓取的能力。你将学到Web爬虫的基本原理、HTML解析与数据提取、API调用与数据获取、数据清洗与格式转换,以及实际爬虫项目的开发方法。

学习目标

阅读完本文后,你将能够:

  • 爬虫基础:理解HTTP协议和网页结构,掌握requests库的使用方法
  • HTML解析:能够使用BeautifulSoup解析HTML,提取所需的数据
  • API调用:能够调用REST API获取数据,处理JSON响应
  • 数据清洗:掌握数据清洗技巧,处理缺失值和格式问题
  • 项目实战:能够构建完整的爬虫项目,遵守网络爬虫的道德规范

一、网络基础:理解HTTP协议

1.1 HTTP请求与响应

HTTP(HyperText Transfer Protocol)是互联网上应用最广泛的协议,理解HTTP是学习网络数据抓取的基础。

# HTTP请求的基本结构
"""
请求行:GET /index.html HTTP/1.1
请求头:
    Host: www.example.com
    User-Agent: Mozilla/5.0
    Accept: text/html
 
响应行:HTTP/1.1 200 OK
响应头:
    Content-Type: text/html
    Content-Length: 1234
响应体:<html>...</html>
"""

1.2 使用requests库

requests是Python中最流行的HTTP库,提供了简洁易用的API。

import requests
 
# 基本GET请求
response = requests.get('https://www.example.com')
 
# 检查请求状态
print(f"状态码: {response.status_code}")
print(f"成功: {response.ok}")  # status_code < 400
 
# 获取响应内容
text = response.text  # 文本内容
content = response.content  # 字节内容
 
# 获取JSON数据
response = requests.get('https://api.example.com/data')
data = response.json()
 
# 带参数的请求
params = {'key1': 'value1', 'key2': 'value2'}
response = requests.get('https://api.example.com/search', params=params)
 
# 添加请求头
headers = {
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)',
    'Accept': 'text/html'
}
response = requests.get('https://www.example.com', headers=headers)

下面的序列图展示了HTTP请求响应的完整过程:

sequenceDiagram
    participant Client as 爬虫程序
    participant Server as Web服务器

    Note over Client,Server: 1. 发起HTTP请求
    Client->>Server: GET /page.html HTTP/1.1
    Note right of Client: 请求头:<br/>Host: example.com<br/>User-Agent: Python Bot

    Note over Client,Server: 2. 服务器处理请求
    Server->>Server: 验证请求<br/>查找资源<br/>准备响应

    Note over Client,Server: 3. 返回HTTP响应
    Server-->>Client: HTTP/1.1 200 OK
    Note left of Client: 响应头:<br/>Content-Type: text/html<br/>Content-Length: 1234
    Server-->>Client: &lt;html&gt;...内容...&lt;/html&gt;

    Note over Client,Server: 4. 处理响应数据
    Client->>Client: 解析HTML<br/>提取数据<br/>保存结果

图表讲解:这个序列图展示了爬虫程序与Web服务器之间的标准交互流程,这是所有网络数据抓取的基础。

发起HTTP请求阶段(绿色区域):爬虫程序构造HTTP请求,包含请求行(指定方法和路径)、请求头(提供元信息如User-Agent)。请求通过互联网发送到Web服务器。requests.get()方法封装了这个过程,开发者只需指定URL和可选参数。

服务器处理请求阶段(蓝色区域):Web服务器接收请求,进行验证和权限检查,然后查找请求的资源(如HTML文件、JSON数据等)。如果资源存在,准备响应;如果不存在,准备错误响应。服务器还可能记录访问日志、执行业务逻辑。

返回HTTP响应阶段(黄色区域):服务器发送响应给客户端。响应包含状态码(200表示成功,404表示未找到,500表示服务器错误)、响应头(描述内容类型、长度等编码信息)、响应体(实际内容,如HTML文档、JSON数据)。

处理响应数据阶段(粉色区域):爬虫程序接收响应,首先检查状态码确保请求成功。然后解析响应内容(HTML、JSON等),使用XPath、CSS选择器或正则表达式提取所需数据。最后将数据保存到文件或数据库。

理解这个流程有助于调试爬虫问题:如果获取不到数据,可能是请求被拒绝(检查User-Agent、Cookie等)、响应解析失败(检查HTML结构变化)、或者触发了反爬机制(需要添加延迟、使用代理等)。

1.3 处理常见的HTTP状态码

import requests
 
def fetch_url(url):
    """获取URL内容,处理各种状态码"""
    try:
        response = requests.get(url, timeout=10)
 
        if response.status_code == 200:
            print("请求成功")
            return response.text
 
        elif response.status_code == 404:
            print(f"错误:页面不存在 - {url}")
 
        elif response.status_code == 403:
            print("错误:访问被拒绝,可能需要登录或更换User-Agent")
 
        elif response.status_code == 500:
            print("错误:服务器内部错误")
 
        else:
            print(f"未处理的状态码: {response.status_code}")
 
        return None
 
    except requests.exceptions.Timeout:
        print("错误:请求超时")
    except requests.exceptions.ConnectionError:
        print("错误:无法连接到服务器")
    except Exception as e:
        print(f"错误: {e}")
 
    return None

二、HTML解析:提取网页数据

2.1 HTML基础结构

HTML(HyperText Markup Language)是网页的标记语言,了解其结构是提取数据的基础。

<!DOCTYPE html>
<html>
<head>
    <title>页面标题</title>
</head>
<body>
    <h1>主标题</h1>
    <div class="container">
        <p id="intro">这是一段文字</p>
        <ul>
            <li class="item">项目1</li>
            <li class="item">项目2</li>
            <li class="item">项目3</li>
        </ul>
        <a href="https://example.com">链接</a>
    </div>
</body>
</html>

2.2 使用BeautifulSoup解析HTML

BeautifulSoup是Python中最流行的HTML解析库,能够将HTML文档转换为Python对象树。

from bs4 import BeautifulSoup
import requests
 
# 获取网页内容
url = 'https://example.com'
response = requests.get(url)
html_content = response.text
 
# 创建BeautifulSoup对象
soup = BeautifulSoup(html_content, 'html.parser')
 
# 基本查找方法
# 通过标签查找
title = soup.find('title')  # 找到第一个title标签
print(f"页面标题: {title.text}")
 
# 通过class查找
items = soup.find_all('li', class_='item')
for item in items:
    print(item.text)
 
# 通过id查找
intro = soup.find('p', id='intro')
print(intro.text)
 
# CSS选择器(更灵活)
container = soup.select_one('.container')  # 第一个匹配的元素
all_items = soup.select('.item')  # 所有匹配的元素
 
# 链式查找
links = soup.select('div.container a')
for link in links:
    print(f"链接文本: {link.text}")
    print(f"链接地址: {link.get('href')}")

2.3 常用的数据提取模式

from bs4 import BeautifulSoup
 
def extract_page_data(html):
    """提取页面数据的常见模式"""
    soup = BeautifulSoup(html, 'html.parser')
 
    data = {}
 
    # 提取文本
    data['title'] = soup.find('h1').get_text(strip=True)
 
    # 提取属性
    image = soup.find('img')
    data['image_url'] = image.get('src') if image else None
 
    # 提取所有链接
    data['links'] = [a.get('href') for a in soup.find_all('a') if a.get('href')]
 
    # 提取表格数据
    table = soup.find('table')
    if table:
        rows = []
        for tr in table.find_all('tr'):
            cells = [td.get_text(strip=True) for td in tr.find_all(['td', 'th'])]
            rows.append(cells)
        data['table'] = rows
 
    # 提取带有特定属性的元素
    special_divs = soup.find_all('div', attrs={'data-type': 'special'})
    data['special_items'] = [div.get_text(strip=True) for div in special_divs]
 
    return data

下面的流程图展示了HTML解析和数据提取的完整流程:

flowchart TD
    A[获取HTML内容] --> B[创建BeautifulSoup对象]
    B --> C[解析HTML结构]

    C --> D{需要提取什么?}

    D -->|单一元素| E[使用find方法]
    D -->|多个元素| F[使用find_all方法]
    D -->|复杂选择| G[使用CSS选择器]

    E --> H[获取元素内容]
    F --> H
    G --> H

    H --> I{需要什么信息?}

    I -->|文本内容| J[使用.text或.get_text]
    I -->|属性值| K[使用.get方法]
    I -->|HTML| L[使用.decode_contents]

    J --> M[清洗和格式化数据]
    K --> M
    L --> M

    M --> N[保存到数据结构]
    N --> O[输出或存储]

    style A fill:#e1f5e1
    style O fill:#e1f5ff
    style D fill:#fff5e1
    style I fill:#fff5e1

图表讲解:这个流程图展示了HTML解析和数据提取的完整过程,帮助理解如何从HTML中获取所需数据。

流程从获取HTML内容开始(绿色区域):这通常通过requests.get()完成,得到包含网页源代码的字符串。然后创建BeautifulSoup对象并指定解析器('html.parser'是Python内置的),BeautifulSoup将HTML字符串解析为可操作的Python对象树。

解析完成后进入选择阶段(黄色决策):需要提取什么类型的元素?如果只需要第一个匹配的元素,使用find()方法;如果需要所有匹配的元素,使用find_all()方法;如果选择条件复杂(如同时满足多个条件、嵌套选择),使用CSS选择器(select()方法),支持.class#idtag.class等语法。

选择到元素后,进入信息提取阶段(第二个黄色决策):需要什么类型的信息?文本内容使用.text.get_text()方法获取;属性值(如href、src)使用.get()方法;原始HTML使用.decode_contents()方法。.get_text()可以添加strip=True参数去除多余空白。

提取的信息需要清洗和格式化(蓝色区域):去除多余空白、转换数据类型、处理缺失值等。然后将清洗后的数据保存到Python数据结构(列表、字典等),最后输出到控制台或保存到文件。

这个流程是HTML解析的标准模式,理解它可以根据不同的网页结构灵活调整提取策略。


三、API调用:获取结构化数据

3.1 REST API基础

API(Application Programming Interface)允许程序以结构化的方式访问数据。REST(Representational State Transfer)是最常见的API架构风格。

import requests
import json
 
# API基础URL
base_url = 'https://api.example.com'
 
# GET请求 - 获取数据
response = requests.get(f'{base_url}/users')
if response.status_code == 200:
    users = response.json()
    print(f"获取到 {len(users)} 个用户")
 
# 带查询参数的GET请求
params = {
    'page': 1,
    'per_page': 20,
    'sort': 'name'
}
response = requests.get(f'{base_url}/users', params=params)
 
# POST请求 - 创建数据
new_user = {
    'name': '张三',
    'email': '[email protected]'
}
response = requests.post(f'{base_url}/users', json=new_user)
 
# PUT请求 - 更新数据
updated_user = {'name': '李四'}
response = requests.put(f'{base_url}/users/1', json=updated_user)
 
# DELETE请求 - 删除数据
response = requests.delete(f'{base_url}/users/1')

3.2 处理API响应

import requests
from typing import Optional, Dict, Any
 
class APIClient:
    """简单的API客户端"""
 
    def __init__(self, base_url: str, api_key: Optional[str] = None):
        self.base_url = base_url
        self.headers = {}
        if api_key:
            self.headers['Authorization'] = f'Bearer {api_key}'
 
    def get(self, endpoint: str, params: Optional[Dict] = None) -> Dict[str, Any]:
        """GET请求"""
        url = f'{self.base_url}{endpoint}'
        response = requests.get(url, headers=self.headers, params=params)
 
        if response.status_code == 200:
            return response.json()
        else:
            raise Exception(f"API请求失败: {response.status_code}")
 
    def post(self, endpoint: str, data: Dict[str, Any]) -> Dict[str, Any]:
        """POST请求"""
        url = f'{self.base_url}{endpoint}'
        response = requests.post(url, headers=self.headers, json=data)
 
        if response.status_code in [200, 201]:
            return response.json()
        else:
            raise Exception(f"API请求失败: {response.status_code}")
 
# 使用示例
client = APIClient('https://api.github.com')
 
# 获取用户信息
try:
    user_info = client.get('/users/octocat')
    print(f"用户: {user_info['name']}")
    print(f"仓库数: {user_info['public_repos']}")
except Exception as e:
    print(f"错误: {e}")

3.3 处理分页数据

import requests
import time
 
def fetch_all_data(base_url, params=None, max_pages=10):
    """获取所有分页数据"""
    all_data = []
    page = 1
 
    while page <= max_pages:
        # 构造请求参数
        request_params = (params or {}).copy()
        request_params['page'] = page
        request_params['per_page'] = 100
 
        try:
            response = requests.get(base_url, params=request_params)
 
            if response.status_code != 200:
                print(f"请求失败: {response.status_code}")
                break
 
            data = response.json()
 
            # 检查是否还有数据
            if not data:
                break
 
            all_data.extend(data)
            print(f"获取第 {page} 页,累计 {len(all_data)} 条")
 
            page += 1
 
            # 添加延迟避免请求过快
            time.sleep(0.5)
 
        except Exception as e:
            print(f"发生错误: {e}")
            break
 
    return all_data
 
# 使用示例
# url = 'https://api.github.com/users/python/repos'
# repos = fetch_all_data(url)
# print(f"共获取 {len(repos)} 个仓库")

下面的序列图展示了API调用的完整流程,包括认证、分页和错误处理:

sequenceDiagram
    participant Client as 爬虫程序
    participant API as API服务器
    participant DB as 数据库

    Note over Client,DB: 第1页请求
    Client->>API: GET /users?page=1<br/>Authorization: Bearer token
    API->>API: 验证令牌<br/>查询数据库

    API->>DB: SELECT * FROM users<br/>LIMIT 100 OFFSET 0
    DB-->>API: 返回用户数据

    API-->>Client: HTTP 200 OK<br/>{"users": [...], "has_more": true}

    Note over Client,DB: 处理第1页数据
    Client->>Client: 解析JSON<br/>提取数据<br/>保存结果

    Note over Client,DB: 第2页请求
    Client->>API: GET /users?page=2<br/>Authorization: Bearer token
    API->>DB: SELECT * FROM users<br/>LIMIT 100 OFFSET 100
    DB-->>API: 返回用户数据
    API-->>Client: HTTP 200 OK<br/>{"users": [...], "has_more": true}

    Note over Client,DB: 重复直到has_more=false
    Client->>API: GET /users?page=N
    API-->>Client: HTTP 200 OK<br/>{"users": [...], "has_more": false}

    Note over Client,DB: 完成
    Client->>Client: 合并所有数据<br/>输出或存储

图表讲解:这个序列图展示了分页API调用的完整流程,这是获取大量数据的标准模式。

首先发起第1页请求(绿色区域):爬虫程序构造GET请求,包含认证信息(如Bearer token)和分页参数(page=1)。API服务器验证令牌的有效性,然后从数据库查询数据。数据库执行带偏移量的查询(LIMIT 100 OFFSET 0),返回第一页的100条记录。API将记录打包成JSON响应返回,同时包含元数据如has_more表示是否还有更多数据。

客户端处理第1页数据(蓝色区域):解析JSON响应,提取用户数据列表,保存到内存或文件。然后检查has_more字段,如果为true,继续请求下一页。

第2页请求与第1页类似,但参数变为page=2,数据库查询的偏移量变为OFFSET 100,跳过已获取的100条记录,返回接下来的100条。

这个过程重复进行(黄色循环),每次页码加1,偏移量增加100,直到API返回has_more=false或达到预设的最大页数限制。

最后(粉色区域):客户端合并所有页的数据,得到完整的记录集合,然后输出或存储。这个模式适用于大多数分页API,只需要根据具体API调整分页参数名(page/offset等)和响应格式。

理解这个流程,可以编写通用的分页数据获取函数,自动处理所有分页逻辑,简化调用代码。


四、数据清洗:确保数据质量

4.1 数据清洗的重要性

从网页或API获取的数据往往需要清洗才能使用。常见问题包括:缺失值、重复数据、格式不一致、异常值等。

4.2 文本数据清洗

import re
 
def clean_text(text):
    """清洗文本数据"""
    if not isinstance(text, str):
        return str(text)
 
    # 去除多余空白
    text = ' '.join(text.split())
 
    # 去除特殊字符(保留中文、字母、数字、常用标点)
    text = re.sub(r'[^\u4e00-\u9fa5a-zA-Z0-9\s,。!?、;:""''()]', '', text)
 
    # 统一全角半角标点
    text = text.replace(',', ',').replace('。', '.')
    text = text.replace('!', '!').replace('?', '?')
 
    return text.strip()
 
# 批量清洗
texts = [
    "  这是  一段  有  多余  空白  的  文本  ",
    "这段文本有特殊字符@#$%^&*",
    "全角标点,应该转换。"
]
 
cleaned = [clean_text(t) for t in texts]
for original, clean in zip(texts, cleaned):
    print(f"原始: {original}")
    print(f"清洗: {clean}\n")

4.3 数值数据清洗

import re
from typing import Optional, Union
 
def clean_number(value) -> Optional[float]:
    """清洗数值数据"""
    if value is None:
        return None
 
    if isinstance(value, (int, float)):
        return float(value)
 
    if isinstance(value, str):
        # 去除非数字字符(保留负号、小数点)
        cleaned = re.sub(r'[^\d.-]', '', value)
 
        if cleaned:
            try:
                return float(cleaned)
            except ValueError:
                return None
 
    return None
 
def clean_price(price_str: str) -> Optional[float]:
    """清洗价格数据"""
    if not price_str:
        return None
 
    # 移除货币符号和千位分隔符
    price_str = price_str.replace('¥', '').replace('$', '').replace(',', '')
 
    # 处理"万元"等单位
    if '万' in price_str:
        number = clean_number(price_str.replace('万', ''))
        return number * 10000 if number else None
 
    return clean_number(price_str)
 
# 使用示例
prices = ['¥1,234.56', '$2,345.67', '100万元', 'abc', '']
cleaned_prices = [clean_price(p) for p in prices]
print(cleaned_prices)

4.4 处理缺失值

import pandas as pd
from typing import List, Dict, Any
 
def handle_missing_values(data: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    """处理缺失值"""
    cleaned = []
 
    for item in data:
        cleaned_item = {}
 
        for key, value in item.items():
            # 跳过None值
            if value is None:
                continue
 
            # 跳过空字符串
            if isinstance(value, str) and not value.strip():
                continue
 
            # 跳过空列表
            if isinstance(value, list) and not value:
                continue
 
            cleaned_item[key] = value
 
        cleaned.append(cleaned_item)
 
    return cleaned
 
def fill_missing_values(data: List[Dict[str, Any]], defaults: Dict[str, Any]) -> List[Dict[str, Any]]:
    """用默认值填充缺失字段"""
    filled = []
 
    for item in data:
        filled_item = defaults.copy()
        filled_item.update(item)
        filled.append(filled_item)
 
    return filled
 
# 使用示例
data = [
    {'name': '张三', 'age': 25, 'city': '北京'},
    {'name': '李四', 'age': None},  # 缺少city,age为None
    {'name': '', 'age': 30, 'city': '上海'},  # name为空字符串
]
 
# 方式1:移除缺失值
cleaned = handle_missing_values(data)
print("移除缺失值后:")
for item in cleaned:
    print(item)
 
# 方式2:填充默认值
filled = fill_missing_values(data, {'name': '未知', 'age': 0, 'city': '未知'})
print("\n填充默认值后:")
for item in filled:
    print(item)

4.5 去重和验证

from typing import List, Dict, Any, Callable
 
def remove_duplicates(data: List[Dict[str, Any]], key: str) -> List[Dict[str, Any]]:
    """根据指定字段去重"""
    seen = set()
    unique = []
 
    for item in data:
        value = item.get(key)
        if value not in seen:
            seen.add(value)
            unique.append(item)
 
    return unique
 
def validate_data(data: List[Dict[str, Any]], validators: Dict[str, Callable]) -> List[Dict[str, Any]]:
    """验证数据"""
    valid = []
 
    for item in data:
        is_valid = True
 
        for field, validator in validators.items():
            value = item.get(field)
 
            if not validator(value):
                print(f"验证失败: {field}={value}")
                is_valid = False
                break
 
        if is_valid:
            valid.append(item)
 
    return valid
 
# 使用示例
data = [
    {'id': 1, 'email': '[email protected]', 'age': 25},
    {'id': 2, 'email': '[email protected]', 'age': 30},
    {'id': 1, 'email': '[email protected]', 'age': 25},  # 重复
    {'id': 3, 'email': 'invalid-email', 'age': -5},  # 无效数据
]
 
# 去重
unique_data = remove_duplicates(data, 'id')
print(f"去重后: {len(unique_data)} 条")
 
# 验证
validators = {
    'email': lambda x: '@' in str(x) if x else False,
    'age': lambda x: isinstance(x, int) and 0 < x < 150
}
valid_data = validate_data(unique_data, validators)
print(f"验证后: {len(valid_data)} 条")

下面的流程图展示了数据清洗的完整流程:

flowchart TD
    A[获取原始数据] --> B[数据加载和检查]
    B --> C{数据质量如何?}

    C -->|有缺失值| D[处理缺失值]
    C -->|有重复| E[去重]
    C -->|格式问题| F[格式转换]

    D --> G{处理方式}
    G -->|删除| H[移除缺失记录]
    G -->|填充| I[使用默认值]

    E --> J[识别重复记录]
    J --> K[保留首次出现]

    F --> L[文本规范化]
    F --> M[数值转换]
    F --> N[日期格式化]

    H --> O[合并清洗结果]
    I --> O
    K --> O
    L --> O
    M --> O
    N --> O

    O --> P[验证数据质量]
    P --> Q{验证通过?}

    Q -->|否| R[记录问题数据]
    Q -->|是| S[数据可用]

    R --> T[修正或排除]
    T --> S

    S --> U[保存清洗后数据]

    style A fill:#e1f5e1
    style U fill:#e1f5ff
    style C fill:#fff5e1
    style G fill:#fff5e1
    style Q fill:#fff5e1

图表讲解:这个流程图展示了数据清洗的完整流程,包含了各种数据质量问题的处理方法。

流程从获取原始数据开始(绿色区域):首先进行数据加载和初步检查,了解数据的基本情况,如字段类型、缺失值比例、数据范围等。

然后进行数据质量评估(第一个黄色决策):根据发现的问题类型选择不同的处理路径。

如果存在缺失值(蓝色区域):需要决定处理方式。可以选择删除包含缺失值的记录(简单但可能丢失信息),或使用填充策略(保留记录但可能引入偏差)。填充策略包括使用默认值、均值、中位数、前向/后向填充等。

如果存在重复记录(粉色区域):需要识别重复的标准(如某个字段完全相同,或组合字段相同)。去重时通常保留首次出现的记录,删除后续重复。

如果存在格式问题(紫色区域):需要根据数据类型进行转换。文本规范化包括统一大小写、去除空白、特殊字符处理等;数值转换包括去除非数字字符、处理单位(如”万元”)、类型转换等;日期格式化包括识别各种日期格式、统一为标准格式。

清洗后的数据需要合并(蓝色区域),将各种处理的结果整合。

然后进行数据验证(第二个黄色决策):检查数据是否符合预期,如值域是否合理、格式是否正确、业务规则是否满足。如果验证失败,记录问题数据,决定修正或排除。如果验证通过,数据可以投入使用(绿色区域)。

最后保存清洗后的数据(绿色区域),以便后续分析和使用。理解这个流程可以系统化地处理各种数据质量问题。


五、实战案例:构建新闻爬虫

5.1 项目概述

构建一个新闻网站爬虫,能够:

  1. 抓取新闻列表
  2. 提取新闻详情
  3. 保存为结构化数据
  4. 支持增量更新

5.2 完整实现

import requests
from bs4 import BeautifulSoup
from typing import List, Dict, Any, Optional
from datetime import datetime
from pathlib import Path
import json
import time
 
class NewsCrawler:
    """新闻爬虫类"""
 
    def __init__(self, base_url: str, output_dir: str = 'news_data'):
        self.base_url = base_url
        self.output_dir = Path(output_dir)
        self.output_dir.mkdir(exist_ok=True)
 
        self.session = requests.Session()
        self.session.headers.update({
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
        })
 
    def fetch_news_list(self, page: int = 1) -> List[Dict[str, str]]:
        """获取新闻列表"""
        url = f"{self.base_url}/news?page={page}"
 
        try:
            response = self.session.get(url, timeout=10)
            response.raise_for_status()
 
            soup = BeautifulSoup(response.text, 'html.parser')
 
            news_items = []
            for item in soup.select('.news-item'):
                title_elem = item.select_one('.title a')
                link_elem = item.select_one('.title a')
                date_elem = item.select_one('.date')
                summary_elem = item.select_one('.summary')
 
                if title_elem and link_elem:
                    news_items.append({
                        'title': title_elem.get_text(strip=True),
                        'link': link_elem.get('href'),
                        'date': date_elem.get_text(strip=True) if date_elem else '',
                        'summary': summary_elem.get_text(strip=True) if summary_elem else ''
                    })
 
            return news_items
 
        except Exception as e:
            print(f"获取新闻列表失败: {e}")
            return []
 
    def fetch_news_detail(self, news_url: str) -> Optional[Dict[str, Any]]:
        """获取新闻详情"""
        if not news_url.startswith('http'):
            news_url = f"{self.base_url}{news_url}"
 
        try:
            response = self.session.get(news_url, timeout=10)
            response.raise_for_status()
 
            soup = BeautifulSoup(response.text, 'html.parser')
 
            # 提取标题
            title = soup.select_one('.article-title')
            title = title.get_text(strip=True) if title else ''
 
            # 提取内容
            content_elem = soup.select_one('.article-content')
            content = ''
            if content_elem:
                paragraphs = content_elem.find_all('p')
                content = '\n\n'.join([p.get_text(strip=True) for p in paragraphs])
 
            # 提取元数据
            author = soup.select_one('.author')
            author = author.get_text(strip=True) if author else ''
 
            publish_time = soup.select_one('.publish-time')
            publish_time = publish_time.get_text(strip=True) if publish_time else ''
 
            # 提取图片
            images = []
            for img in soup.select('.article-content img'):
                img_url = img.get('src') or img.get('data-src')
                if img_url:
                    images.append(img_url)
 
            return {
                'title': title,
                'content': content,
                'author': author,
                'publish_time': publish_time,
                'images': images,
                'url': news_url,
                'crawled_at': datetime.now().isoformat()
            }
 
        except Exception as e:
            print(f"获取新闻详情失败 ({news_url}): {e}")
            return None
 
    def save_news(self, news_data: Dict[str, Any]):
        """保存新闻数据"""
        # 从URL生成文件名
        filename = news_data['url'].split('/')[-1] or str(hash(news_data['url']))
        filepath = self.output_dir / f"{filename}.json"
 
        with open(filepath, 'w', encoding='utf-8') as f:
            json.dump(news_data, f, ensure_ascii=False, indent=2)
 
    def crawl(self, max_pages: int = 5, delay: float = 1.0):
        """执行爬取"""
        all_news = []
 
        for page in range(1, max_pages + 1):
            print(f"正在获取第 {page} 页...")
 
            news_list = self.fetch_news_list(page)
            if not news_list:
                print("没有更多新闻")
                break
 
            for item in news_list:
                print(f"  抓取: {item['title']}")
 
                detail = self.fetch_news_detail(item['link'])
                if detail:
                    self.save_news(detail)
                    all_news.append(detail)
 
                time.sleep(delay)
 
        print(f"\n爬取完成!共获取 {len(all_news)} 条新闻")
        return all_news
 
 
# 使用示例
if __name__ == "__main__":
    crawler = NewsCrawler('https://example-news-site.com')
    news = crawler.crawl(max_pages=3, delay=0.5)

5.3 增量更新功能

import hashlib
 
class IncrementalNewsCrawler(NewsCrawler):
    """支持增量更新的新闻爬虫"""
 
    def __init__(self, base_url: str, output_dir: str = 'news_data'):
        super().__init__(base_url, output_dir)
        self.crawled_urls = self._load_crawled_urls()
 
    def _load_crawled_urls(self) -> set:
        """加载已爬取的URL"""
        crawled_file = self.output_dir / 'crawled_urls.txt'
        if crawled_file.exists():
            with open(crawled_file, 'r', encoding='utf-8') as f:
                return set(line.strip() for line in f)
        return set()
 
    def _save_crawled_url(self, url: str):
        """保存已爬取的URL"""
        crawled_file = self.output_dir / 'crawled_urls.txt'
        with open(crawled_file, 'a', encoding='utf-8') as f:
            f.write(f"{url}\n")
        self.crawled_urls.add(url)
 
    def fetch_news_detail(self, news_url: str) -> Optional[Dict[str, Any]]:
        """获取新闻详情(跳过已爬取)"""
        if news_url in self.crawled_urls:
            print(f"  跳过(已爬取): {news_url}")
            return None
 
        detail = super().fetch_news_detail(news_url)
        if detail:
            self._save_crawled_url(news_url)
 
        return detail
 
    def crawl_new_only(self, max_pages: int = 5):
        """只爬取新内容"""
        return self.crawl(max_pages)

下面的流程图展示了新闻爬虫的完整工作流程:

flowchart TD
    A[开始爬取] --> B[初始化爬虫]
    B --> C[设置页码为1]

    C --> D[获取新闻列表页]
    D --> E{列表页成功?}

    E -->|否| F[检查重试次数]
    E -->|是| G[解析新闻链接]

    F --> H{达到最大重试?}
    H -->|是| I[结束爬取]
    H -->|否| D

    G --> J[遍历新闻链接]
    J --> K{还有链接?}

    K -->|否| L[页码+1]
    K -->|是| M[检查是否已爬取]

    M --> N{已爬取?}
    N -->|是| O[跳过此链接]
    N -->|否| P[获取详情页]

    P --> Q{详情页成功?}
    Q -->|否| O
    Q -->|是| R[解析新闻内容]

    R --> S[提取标题、正文、图片]
    S --> T[保存为JSON]

    T --> U[记录已爬取URL]
    U --> V[添加延迟]

    V --> O
    O --> K
    L --> W{达到最大页数?}

    W -->|是| I
    W -->|否| D

    I --> X[生成统计报告]
    X --> Y[输出结果]

    style A fill:#e1f5e1
    style Y fill:#e1f5ff
    style E fill:#fff5e1
    style H fill:#fff5e1
    style N fill:#fff5e1
    style Q fill:#fff5e1
    style W fill:#fff5e1

图表讲解:这个流程图展示了新闻爬虫从初始化到完成报告的完整工作流程,包含了错误处理和增量更新机制。

爬取开始(绿色区域):初始化爬虫对象,设置基础URL、输出目录、请求头等。设置页码从1开始。

进入主循环:获取新闻列表页(第一个黄色决策)。如果请求失败,检查重试次数(黄色决策),如果达到最大重试次数则结束爬取(红色区域),否则重试。

如果列表页成功(蓝色区域):解析HTML,提取每条新闻的标题、链接、日期、摘要等信息。然后遍历所有新闻链接(黄色决策),对每条链接检查是否已爬取(黄色决策)。如果已爬取则跳过,这是增量更新机制的核心。

如果未爬取(粉色区域):获取新闻详情页。请求可能失败(黄色决策),失败则跳过该链接;成功则解析详情页HTML,提取标题、正文、作者、发布时间、图片等完整内容。

提取完成后(绿色区域):将数据保存为JSON文件,使用URL的哈希值作为文件名避免冲突。记录URL到已爬取集合,支持增量更新。添加延迟避免请求过快触发反爬机制(黄色决策)。

然后处理下一条链接,直到当前页所有链接处理完毕。页码加1,检查是否达到最大页数限制(黄色决策),如果达到则结束爬取(红色区域),否则继续下一页。

最终(蓝色区域):生成统计报告(如成功抓取数量、失败数量、总耗时等),输出结果或发送通知。

这个流程展示了健壮爬虫的完整设计:错误处理和重试、增量更新避免重复爬取、延迟控制请求速率、完整的日志和统计。理解这个流程可以构建各种类型的网络爬虫。


六、核心概念总结

概念定义应用场景注意事项
HTTP协议互联网数据传输协议所有网络请求理解请求头、状态码
requests库Python HTTP库发送网络请求使用timeout避免阻塞
BeautifulSoupHTML解析库提取网页数据检查元素存在性
CSS选择器元素定位语法精确选择元素.class、#id、tag
REST API结构化数据接口获取JSON数据处理分页和限流
数据清洗数据质量处理确保数据可用处理缺失和异常
反爬机制网站防护措施绕过限制遵守规则,控制频率

常见问题解答

Q1:网站反爬虫机制有哪些,如何应对?

:常见的反爬虫机制包括User-Agent检测、IP限制、请求频率限制、验证码、登录要求等。

User-Agent检测是最基本的反爬措施,服务器检查请求头中的User-Agent字段,识别是否为浏览器。应对方法是设置常见的浏览器User-Agent:

headers = {
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
}
response = requests.get(url, headers=headers)

IP限制通过记录IP地址的请求次数,超过阈值就封禁。应对方法包括使用代理IP池、控制请求频率、分布式爬虫。请求频率要合理,通常每次请求间隔1-3秒,避免给服务器造成压力。

验证码用于区分人类和机器。应对方法包括使用验证码识别服务(OCR)、手动输入、或者寻找不需要验证码的数据源。对于简单验证码,可以使用tesseract-ocr等工具识别。

登录要求针对需要登录才能访问的内容。应对方法包括使用requests.Session保存Cookie、模拟登录流程、使用Selenium等浏览器自动化工具。

其他技术包括JavaScript渲染(需要用Selenium或Pyppeteer)、Cookie追踪(保存和传递Cookie)、HTTPS加密(处理SSL证书)。但最重要的道德原则是遵守robots.txt协议,不要对服务器造成过大压力。

# 示例:基本的反爬应对
import requests
import time
from fake_useragent import UserAgent
 
ua = UserAgent()
session = requests.Session()
 
def fetch_with_retry(url, max_retries=3):
    for i in range(max_retries):
        try:
            headers = {'User-Agent': ua.random}
            response = session.get(url, headers=headers, timeout=10)
            time.sleep(2)  # 控制频率
            return response
        except Exception as e:
            if i == max_retries - 1:
                raise
            time.sleep(5)
    return None

Q2:爬虫的速度应该控制在什么范围?如何避免被封IP?

:爬虫速度取决于目标网站的承受能力和数据需求,基本原则是”最小必要原则”。

速度控制有几个层次:请求间延迟、并发控制、峰值限制。请求间延迟是最基本的控制,通常每次请求间隔1-3秒,对友好爬虫可以更长。并发控制是限制同时进行的请求数量,通常不超过5个并发连接。峰值限制是在一定时间窗口内限制总请求数,如每分钟不超过20次。

避免被封IP的核心是模仿人类行为模式:使用真实浏览器的User-Agent、随机化请求间隔、偶尔的长暂停、避免固定的时间模式。使用Session保持连接复用,减少连接开销。

代理IP池是分散请求的有效方法:通过多个IP地址发送请求,单个IP的请求量被分散。但要注意代理的质量和成本,以及使用代理的合法性。

import time
import random
from concurrent.futures import ThreadPoolExecutor, as_completed
 
def crawl_url(url):
    time.sleep(random.uniform(1, 3))  # 随机延迟
    response = requests.get(url)
    return response
 
urls = [...]  # URL列表
 
# 控制并发数
with ThreadPoolExecutor(max_workers=3) as executor:
    futures = [executor.submit(crawl_url, url) for url in urls]
    for future in as_completed(futures):
        result = future.result()

最重要的道德考量:遵守robots.txt协议,只爬取公开数据,不绕过明确的访问限制,对服务器友好。爬虫应该是数据的合理获取工具,而不是攻击工具。


Q3:如何处理JavaScript渲染的网页?

:JavaScript渲染的网页需要执行JavaScript代码才能看到完整内容,传统的requests+BeautifulSoup无法处理。

这类网页的加载过程:浏览器首先收到基本的HTML框架,然后执行JavaScript代码,通过AJAX请求获取数据,最后动态更新页面内容。使用requests只能获取初始HTML,看不到动态加载的内容。

解决方案包括:分析网络请求(直接调用数据API)、使用Selenium浏览器自动化、使用Pyppeteer无头浏览器、分析JavaScript逻辑。

最推荐的方法是分析网络请求:打开浏览器开发者工具的Network选项卡,加载页面,观察XHR/Fetch请求。通常能找到返回JSON数据的API接口,直接调用这个API比渲染整个页面高效得多。

# 示例:直接调用API获取数据
import requests
 
url = "https://example.com/api/news"
params = {'page': 1, 'limit': 20}
headers = {
    'User-Agent': 'Mozilla/5.0...',
    'Referer': 'https://example.com/news',
    'X-Requested-With': 'XMLHttpRequest'
}
 
response = requests.get(url, params=params, headers=headers)
data = response.json()

如果无法找到API,需要使用浏览器自动化工具。Selenium控制真实浏览器,支持各种浏览器,但资源消耗大。Pyppeteer基于无头Chrome,更轻量,适合服务器环境。

# 示例:使用Selenium
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
 
driver = webdriver.Chrome()
driver.get('https://example.com')
 
# 等待元素加载
wait = WebDriverWait(driver, 10)
element = wait.until(EC.presence_of_element_located((By.CLASS_NAME, 'content')))
 
# 获取渲染后的HTML
html = driver.page_source
driver.quit()

选择原则:优先尝试直接调用API(最快、最省资源),其次使用Pyppeteer(无头、轻量),最后用Selenium(兼容性好但重)。根据具体网站的反爬强度和数据重要性选择合适方案。


Q4:如何设计可维护的爬虫代码结构?

:可维护的爬虫代码应该模块化、配置化、易于扩展。

模块化设计将爬虫分解为独立的组件:请求模块(处理HTTP请求)、解析模块(提取数据)、存储模块(保存数据)、调度模块(控制流程)。每个模块有明确的职责,可以独立测试和修改。

配置化将硬编码的值提取为配置:目标URL、输出目录、请求延迟、重试次数等。配置可以放在文件(JSON/YAML)中,便于修改而不动代码。

# config.yaml
target:
  base_url: "https://example.com"
  start_page: 1
  max_pages: 10
 
crawler:
  delay: 2.0
  timeout: 10
  max_retries: 3
  concurrent: 3
 
storage:
  output_dir: "output"
  format: "json"
 
# crawler.py
import yaml
 
class CrawlerConfig:
    def __init__(self, config_file):
        with open(config_file) as f:
            self.config = yaml.safe_load(f)
 
    @property
    def base_url(self):
        return self.config['target']['base_url']

错误处理和日志记录是可维护性的关键:使用try-except捕获异常,记录详细的错误信息(URL、状态码、异常类型),避免单点失败导致整个爬虫停止。使用logging模块而不是print,可以控制日志级别和输出位置。

import logging
 
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('crawler.log'),
        logging.StreamHandler()
    ]
)
 
logger = logging.getLogger(__name__)
 
def fetch_url(url):
    try:
        response = requests.get(url, timeout=10)
        logger.info(f"成功获取: {url}")
        return response
    except requests.Timeout:
        logger.error(f"超时: {url}")
    except Exception as e:
        logger.exception(f"请求失败: {url}")

数据验证确保存储的数据质量:定义数据模型或schema,验证必需字段、数据类型、值范围。不符合条件的数据记录到错误列表,不混入正常数据。

单元测试保证代码质量:为每个模块编写测试用例,mock网络请求,测试解析逻辑。CI/CD自动化测试确保代码修改不会引入bug。

文档和注释帮助维护:在代码中添加注释解释复杂逻辑,编写README说明项目结构、依赖安装、使用方法。维护问题日志记录遇到的问题和解决方案。


Q5:爬虫数据的存储方案应该如何选择?

:存储方案取决于数据量、查询需求、更新频率等因素。

小规模数据(MB级别)适合文件存储:JSON格式可读性好,易于调试;CSV格式适合表格数据,Excel可直接打开;Pickle格式保留Python对象,但不可读。文件存储简单直接,但查询和更新不方便。

import json
from pathlib import Path
 
def save_to_json(data, filepath):
    with open(filepath, 'w', encoding='utf-8') as f:
        json.dump(data, f, ensure_ascii=False, indent=2)
 
def load_from_json(filepath):
    with open(filepath, 'r', encoding='utf-8') as f:
        return json.load(f)

中等规模数据(GB级别)适合SQLite数据库:轻量级、无需服务器、支持SQL查询。适合单机爬虫,需要按条件查询或去重的场景。SQLite支持事务、索引,性能比文件好得多。

import sqlite3
 
def init_db(db_path):
    conn = sqlite3.connect(db_path)
    cursor = conn.cursor()
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS news (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            title TEXT NOT NULL,
            url TEXT UNIQUE,
            content TEXT,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        )
    ''')
    conn.commit()
    return conn
 
def insert_news(conn, news_data):
    cursor = conn.cursor()
    try:
        cursor.execute('''
            INSERT INTO news (title, url, content)
            VALUES (?, ?, ?)
        ''', (news_data['title'], news_data['url'], news_data['content']))
        conn.commit()
    except sqlite3.IntegrityError:
        pass  # URL重复

大规模数据(TB级别)或高并发场景需要专业数据库:PostgreSQL适合复杂查询和数据分析,MongoDB适合非结构化数据,Elasticsearch适合全文搜索。但这些需要额外的基础设施和运维成本。

决策建议:首先评估数据规模和增长速度,然后考虑查询需求(简单查询用文件,复杂查询用数据库),考虑更新频率(频繁更新用数据库),考虑部署环境(本地用SQLite,云上用云数据库)。大多数爬虫项目从JSON/SQLite开始,需要时再升级。

对于长期项目,建议设计数据访问层抽象,底层存储可以替换而不影响上层逻辑。使用ORM(如SQLAlchemy)可以简化数据库操作,同时支持多种数据库后端。


总结

本文全面介绍了网络数据抓取与处理的核心技术。我们学习了HTTP协议的基础、requests库的使用、BeautifulSoup解析HTML、API调用获取数据、数据清洗确保质量,以及构建完整爬虫项目的实践方法。

网络数据抓取是自动化获取信息的重要手段,但必须遵守道德规范和法律边界。合理的爬虫应该对服务器友好、遵守robots.txt、尊重数据所有权。掌握这些技术,可以从互联网获取有价值的数据,为分析和决策提供支持。

下篇预告

下一篇我们将深入探讨办公文档自动化处理,带你了解如何自动化处理Excel表格、Word文档、PDF文件和邮件发送。你将学会使用Python处理各种办公文档,大幅提高工作效率。