Skip to content

Latest commit

 

History

History
1139 lines (878 loc) · 31.4 KB

File metadata and controls

1139 lines (878 loc) · 31.4 KB

代理 IP 的使用与管理

当你的爬虫遭遇 IP 封禁时,代理 IP 就成了必不可少的工具。本章将深入讲解代理 IP 的原理、类型选择,以及如何设计和实现一个实用的代理池管理系统。

代理 IP 基础

什么是代理 IP

代理服务器(Proxy Server)是一种位于客户端和目标服务器之间的中间服务器。当你通过代理发送请求时:

  1. 你的请求先发送到代理服务器
  2. 代理服务器代替你向目标服务器发送请求
  3. 目标服务器将响应返回给代理服务器
  4. 代理服务器再将响应转发给你

这样,目标服务器看到的是代理服务器的 IP,而不是你的真实 IP。

代理类型详解

按协议分类

类型 特点 使用场景
HTTP 代理 只支持 HTTP 协议 普通网页爬取
HTTPS 代理 支持 HTTPS 协议 加密网站爬取
SOCKS4 代理 支持 TCP 连接 需要更底层控制
SOCKS5 代理 支持 TCP/UDP,可认证 最灵活的代理类型

按匿名度分类

类型 特点 识别难度
透明代理 目标服务器能看到你的真实 IP 容易被识别
匿名代理 隐藏真实 IP,但暴露代理身份 中等
高匿代理 完全隐藏真实 IP 和代理身份 较难识别

对于爬虫来说,高匿代理是首选。

按来源分类

类型 优点 缺点
免费代理 成本为零 不稳定、速度慢、可用率低
付费代理 稳定、速度快、可用率高 需要成本
自建代理 完全可控 需要服务器资源

代理提供商选择指南

选择代理提供商时需要考虑:

  1. IP 质量:是否是高匿代理,是否已被目标网站封禁
  2. IP 数量:IP 池的规模
  3. 地理分布:是否覆盖目标地区
  4. 稳定性:连接成功率和响应速度
  5. 价格:按流量计费还是按 IP 数计费
  6. API 支持:是否提供便捷的 API 获取代理

常见的代理类型:

  • API 提取型:通过 API 获取代理 IP 列表
  • 隧道代理型:固定入口,自动轮换 IP
  • 动态转发型:每次请求自动更换 IP

代理池设计

为什么需要代理池

直接使用单个代理存在以下问题:

  • 代理可能随时失效
  • 单个 IP 容易被封禁
  • 无法动态切换和管理

代理池可以解决这些问题:

  • 统一管理多个代理
  • 自动检测代理有效性
  • 智能分配和轮换代理
  • 记录代理质量评分

代理池架构设计

graph TB
    subgraph ProxyPoolManager["代理池管理器"]
        direction TB

        subgraph Components["核心组件"]
            direction LR
            Fetcher["代理获取器<br/>(Fetcher)"]
            Checker["代理检测器<br/>(Checker)"]
            Allocator["代理分配器<br/>(Allocator)"]
        end

        Storage[("代理存储<br/>(内存 / Redis)")]

        Fetcher --> Storage
        Checker --> Storage
        Allocator --> Storage
    end

    style ProxyPoolManager fill:#e8f4f8,stroke:#0288d1
    style Storage fill:#fff3e0,stroke:#ff9800
Loading

代理池工作流程

flowchart LR
    subgraph 获取阶段
        API["代理API"] --> Fetcher["获取器"]
        Free["免费代理"] --> Fetcher
    end

    subgraph 检测阶段
        Fetcher --> Checker["检测器"]
        Checker -->|有效| Pool[("代理池")]
        Checker -->|无效| Discard["丢弃"]
    end

    subgraph 分配阶段
        Pool --> Allocator["分配器"]
        Allocator --> Crawler["爬虫请求"]
        Crawler -->|成功| Score["评分+1"]
        Crawler -->|失败| Penalty["评分-1"]
        Score --> Pool
        Penalty --> Pool
    end

    style Pool fill:#c8e6c9,stroke:#4caf50
    style Discard fill:#ffcdd2,stroke:#f44336
Loading

核心接口设计

from abc import ABC, abstractmethod
from typing import Optional, List
from dataclasses import dataclass
from enum import Enum


class ProxyProtocol(Enum):
    """代理协议"""
    HTTP = "http"
    HTTPS = "https"
    SOCKS5 = "socks5"


@dataclass
class ProxyInfo:
    """代理信息"""
    host: str
    port: int
    protocol: ProxyProtocol = ProxyProtocol.HTTP
    username: Optional[str] = None
    password: Optional[str] = None

    # 质量指标
    success_count: int = 0
    fail_count: int = 0
    avg_response_time: float = 0.0
    last_check_time: float = 0.0

    @property
    def url(self) -> str:
        """构建代理 URL"""
        auth = ""
        if self.username and self.password:
            auth = f"{self.username}:{self.password}@"
        return f"{self.protocol.value}://{auth}{self.host}:{self.port}"

    @property
    def score(self) -> float:
        """计算代理评分"""
        total = self.success_count + self.fail_count
        if total == 0:
            return 0.5  # 未测试的代理给中等分数
        success_rate = self.success_count / total
        # 考虑响应时间,越快分数越高
        time_score = max(0, 1 - self.avg_response_time / 10)
        return success_rate * 0.7 + time_score * 0.3


class IProxyFetcher(ABC):
    """代理获取器接口"""

    @abstractmethod
    async def fetch(self) -> List[ProxyInfo]:
        """获取代理列表"""
        pass


class IProxyChecker(ABC):
    """代理检测器接口"""

    @abstractmethod
    async def check(self, proxy: ProxyInfo) -> bool:
        """检测代理是否可用"""
        pass


class IProxyPool(ABC):
    """代理池接口"""

    @abstractmethod
    async def get_proxy(self) -> Optional[ProxyInfo]:
        """获取一个可用代理"""
        pass

    @abstractmethod
    async def return_proxy(self, proxy: ProxyInfo, success: bool):
        """归还代理并报告使用结果"""
        pass

    @abstractmethod
    async def add_proxy(self, proxy: ProxyInfo):
        """添加代理"""
        pass

    @abstractmethod
    async def remove_proxy(self, proxy: ProxyInfo):
        """移除代理"""
        pass

代理获取器实现

免费代理获取(仅供学习)

import httpx
from typing import List
from loguru import logger


class FreeProxyFetcher(IProxyFetcher):
    """
    免费代理获取器

    注意:免费代理质量较差,仅供学习测试使用
    """

    async def fetch(self) -> List[ProxyInfo]:
        """从免费代理网站获取代理"""
        proxies = []

        # 示例:从 API 获取(这里用一个示例 API)
        try:
            async with httpx.AsyncClient(timeout=10) as client:
                # 这是一个示例 URL,实际使用时替换为真实的代理 API
                response = await client.get(
                    "https://api.proxyscrape.com/v2/"
                    "?request=getproxies&protocol=http&timeout=10000&country=all"
                )

                if response.status_code == 200:
                    lines = response.text.strip().split("\n")
                    for line in lines:
                        try:
                            host, port = line.strip().split(":")
                            proxies.append(ProxyInfo(
                                host=host,
                                port=int(port),
                                protocol=ProxyProtocol.HTTP
                            ))
                        except ValueError:
                            continue

                logger.info(f"获取到 {len(proxies)} 个免费代理")

        except Exception as e:
            logger.error(f"获取免费代理失败: {e}")

        return proxies

API 代理获取器

class APIProxyFetcher(IProxyFetcher):
    """
    API 代理获取器

    从付费代理服务商的 API 获取代理
    """

    def __init__(
        self,
        api_url: str,
        api_key: Optional[str] = None,
        count: int = 10
    ):
        """
        初始化 API 代理获取器

        Args:
            api_url: API 地址
            api_key: API 密钥
            count: 每次获取数量
        """
        self.api_url = api_url
        self.api_key = api_key
        self.count = count

    async def fetch(self) -> List[ProxyInfo]:
        """从 API 获取代理"""
        proxies = []

        try:
            params = {"num": self.count}
            if self.api_key:
                params["key"] = self.api_key

            async with httpx.AsyncClient(timeout=10) as client:
                response = await client.get(self.api_url, params=params)
                data = response.json()

                # 根据实际 API 返回格式解析
                # 这里假设返回 {"data": [{"ip": "x.x.x.x", "port": 8080}, ...]}
                for item in data.get("data", []):
                    proxies.append(ProxyInfo(
                        host=item["ip"],
                        port=item["port"],
                        protocol=ProxyProtocol(item.get("protocol", "http"))
                    ))

                logger.info(f"从 API 获取到 {len(proxies)} 个代理")

        except Exception as e:
            logger.error(f"从 API 获取代理失败: {e}")

        return proxies

代理检测器实现

import time
import httpx
from typing import Optional


class ProxyChecker(IProxyChecker):
    """
    代理检测器

    检测代理的可用性和响应速度
    """

    # 用于检测的 URL 列表
    CHECK_URLS = [
        "https://httpbin.org/ip",
        "https://api.ipify.org?format=json",
    ]

    def __init__(self, timeout: int = 10):
        """
        初始化检测器

        Args:
            timeout: 检测超时时间
        """
        self.timeout = timeout

    async def check(self, proxy: ProxyInfo) -> bool:
        """
        检测代理是否可用

        Args:
            proxy: 代理信息

        Returns:
            代理是否可用
        """
        start_time = time.time()

        try:
            async with httpx.AsyncClient(
                proxies=proxy.url,
                timeout=self.timeout
            ) as client:
                for url in self.CHECK_URLS:
                    try:
                        response = await client.get(url)
                        if response.status_code == 200:
                            # 更新响应时间
                            response_time = time.time() - start_time
                            proxy.avg_response_time = (
                                proxy.avg_response_time * 0.7 +
                                response_time * 0.3
                            )
                            proxy.last_check_time = time.time()
                            logger.debug(
                                f"代理可用: {proxy.host}:{proxy.port}, "
                                f"响应时间: {response_time:.2f}s"
                            )
                            return True
                    except Exception:
                        continue

        except Exception as e:
            logger.debug(f"代理检测失败: {proxy.host}:{proxy.port} - {e}")

        return False

    async def check_batch(
        self,
        proxies: List[ProxyInfo],
        concurrency: int = 20
    ) -> List[ProxyInfo]:
        """
        批量检测代理

        Args:
            proxies: 代理列表
            concurrency: 并发数

        Returns:
            可用的代理列表
        """
        import asyncio

        semaphore = asyncio.Semaphore(concurrency)
        valid_proxies = []

        async def check_one(proxy: ProxyInfo):
            async with semaphore:
                if await self.check(proxy):
                    valid_proxies.append(proxy)

        tasks = [check_one(p) for p in proxies]
        await asyncio.gather(*tasks, return_exceptions=True)

        logger.info(f"检测完成: {len(valid_proxies)}/{len(proxies)} 可用")
        return valid_proxies

代理池实现

import asyncio
import random
import time
from typing import Optional, List, Dict
from collections import defaultdict
from loguru import logger


class ProxyPool(IProxyPool):
    """
    代理池实现

    特性:
    - 自动获取和检测代理
    - 基于评分的智能分配
    - 自动淘汰失效代理
    - 支持代理预热
    """

    def __init__(
        self,
        fetcher: IProxyFetcher,
        checker: IProxyChecker,
        min_proxies: int = 10,
        max_proxies: int = 100,
        check_interval: int = 300,
        max_fail_count: int = 3
    ):
        """
        初始化代理池

        Args:
            fetcher: 代理获取器
            checker: 代理检测器
            min_proxies: 最小代理数量
            max_proxies: 最大代理数量
            check_interval: 检测间隔(秒)
            max_fail_count: 最大失败次数
        """
        self.fetcher = fetcher
        self.checker = checker
        self.min_proxies = min_proxies
        self.max_proxies = max_proxies
        self.check_interval = check_interval
        self.max_fail_count = max_fail_count

        # 代理存储
        self._proxies: Dict[str, ProxyInfo] = {}
        self._lock = asyncio.Lock()

        # 后台任务
        self._refresh_task: Optional[asyncio.Task] = None
        self._running = False

    def _proxy_key(self, proxy: ProxyInfo) -> str:
        """生成代理唯一标识"""
        return f"{proxy.host}:{proxy.port}"

    async def start(self):
        """启动代理池"""
        self._running = True

        # 初始获取代理
        await self._refresh_proxies()

        # 启动后台刷新任务
        self._refresh_task = asyncio.create_task(self._refresh_loop())

        logger.info("代理池已启动")

    async def stop(self):
        """停止代理池"""
        self._running = False

        if self._refresh_task:
            self._refresh_task.cancel()
            try:
                await self._refresh_task
            except asyncio.CancelledError:
                pass

        logger.info("代理池已停止")

    async def _refresh_loop(self):
        """后台刷新循环"""
        while self._running:
            try:
                await asyncio.sleep(self.check_interval)
                await self._refresh_proxies()
            except asyncio.CancelledError:
                break
            except Exception as e:
                logger.error(f"代理刷新异常: {e}")

    async def _refresh_proxies(self):
        """刷新代理"""
        async with self._lock:
            # 检查是否需要补充代理
            if len(self._proxies) >= self.min_proxies:
                return

            logger.info(f"代理不足 ({len(self._proxies)}/{self.min_proxies}),开始获取...")

            # 获取新代理
            new_proxies = await self.fetcher.fetch()

            # 检测代理
            valid_proxies = await self.checker.check_batch(new_proxies)

            # 添加到池中
            for proxy in valid_proxies:
                key = self._proxy_key(proxy)
                if key not in self._proxies and len(self._proxies) < self.max_proxies:
                    self._proxies[key] = proxy

            logger.info(f"代理池更新完成,当前数量: {len(self._proxies)}")

    async def get_proxy(self) -> Optional[ProxyInfo]:
        """
        获取一个可用代理

        使用加权随机选择,评分高的代理被选中概率更大
        """
        async with self._lock:
            if not self._proxies:
                logger.warning("代理池为空")
                return None

            # 计算权重
            proxies = list(self._proxies.values())
            weights = [max(p.score, 0.1) for p in proxies]

            # 加权随机选择
            selected = random.choices(proxies, weights=weights, k=1)[0]

            logger.debug(f"分配代理: {selected.host}:{selected.port} (评分: {selected.score:.2f})")
            return selected

    async def return_proxy(self, proxy: ProxyInfo, success: bool):
        """
        归还代理并报告使用结果

        Args:
            proxy: 代理信息
            success: 使用是否成功
        """
        async with self._lock:
            key = self._proxy_key(proxy)

            if key not in self._proxies:
                return

            stored_proxy = self._proxies[key]

            if success:
                stored_proxy.success_count += 1
            else:
                stored_proxy.fail_count += 1

                # 检查是否需要淘汰
                if stored_proxy.fail_count >= self.max_fail_count:
                    total = stored_proxy.success_count + stored_proxy.fail_count
                    if total > 5 and stored_proxy.score < 0.3:
                        del self._proxies[key]
                        logger.info(f"淘汰低质量代理: {proxy.host}:{proxy.port}")

    async def add_proxy(self, proxy: ProxyInfo):
        """添加代理"""
        async with self._lock:
            key = self._proxy_key(proxy)
            if key not in self._proxies and len(self._proxies) < self.max_proxies:
                self._proxies[key] = proxy

    async def remove_proxy(self, proxy: ProxyInfo):
        """移除代理"""
        async with self._lock:
            key = self._proxy_key(proxy)
            if key in self._proxies:
                del self._proxies[key]

    @property
    def size(self) -> int:
        """代理池大小"""
        return len(self._proxies)

    def get_stats(self) -> Dict:
        """获取统计信息"""
        if not self._proxies:
            return {"total": 0}

        proxies = list(self._proxies.values())
        scores = [p.score for p in proxies]

        return {
            "total": len(proxies),
            "avg_score": sum(scores) / len(scores),
            "max_score": max(scores),
            "min_score": min(scores),
        }

代理与爬虫集成

使用 httpx 设置代理

import httpx

async def fetch_with_proxy(url: str, proxy_url: str) -> str:
    """使用代理发送请求"""
    async with httpx.AsyncClient(proxies=proxy_url, timeout=30) as client:
        response = await client.get(url)
        return response.text


# 使用示例
proxy = "http://user:pass@127.0.0.1:8080"
content = await fetch_with_proxy("https://httpbin.org/ip", proxy)

集成代理池的爬虫

class ProxiedCrawler:
    """
    集成代理池的爬虫

    自动管理代理的获取、轮换和报告
    """

    def __init__(self, proxy_pool: ProxyPool):
        self.proxy_pool = proxy_pool

    async def fetch(self, url: str) -> Optional[str]:
        """使用代理获取页面"""
        proxy = await self.proxy_pool.get_proxy()

        if not proxy:
            logger.warning("无可用代理")
            return None

        try:
            async with httpx.AsyncClient(
                proxies=proxy.url,
                timeout=30
            ) as client:
                response = await client.get(url)
                response.raise_for_status()

                # 报告成功
                await self.proxy_pool.return_proxy(proxy, success=True)
                return response.text

        except Exception as e:
            logger.warning(f"请求失败: {url} - {e}")
            # 报告失败
            await self.proxy_pool.return_proxy(proxy, success=False)
            return None

隧道代理使用

隧道代理是一种特殊的代理模式,你只需连接到固定的代理入口,每次请求自动分配不同的 IP。

class TunnelProxyClient:
    """
    隧道代理客户端

    特点:固定入口,自动轮换 IP
    """

    def __init__(
        self,
        host: str,
        port: int,
        username: str,
        password: str
    ):
        self.proxy_url = f"http://{username}:{password}@{host}:{port}"

    async def get(self, url: str, **kwargs) -> httpx.Response:
        """发送请求(自动使用隧道代理)"""
        async with httpx.AsyncClient(
            proxies=self.proxy_url,
            timeout=30
        ) as client:
            return await client.get(url, **kwargs)


# 使用示例
tunnel = TunnelProxyClient(
    host="tunnel.example.com",
    port=12345,
    username="your_username",
    password="your_password"
)

# 每次请求自动使用不同 IP
response1 = await tunnel.get("https://httpbin.org/ip")
response2 = await tunnel.get("https://httpbin.org/ip")

代理使用最佳实践

IP 封禁机制分析

大多数网站都有反爬虫的 IP 封禁机制,常见的触发条件和处理方式:

flowchart TD
    Request["发起请求"] --> RateCheck{"频率检测"}
    RateCheck -->|正常| UACheck{"UA检测"}
    RateCheck -->|过快| Block429["429 限流"]

    UACheck -->|正常| BehaviorCheck{"行为检测"}
    UACheck -->|异常| Block412["412 风控"]

    BehaviorCheck -->|正常| Success["正常响应"]
    BehaviorCheck -->|异常| Block403["403 封禁"]

    Block429 --> IPMark["IP标记"]
    Block412 --> IPMark
    Block403 --> IPMark

    IPMark --> BlackList["IP黑名单"]

    style Success fill:#c8e6c9,stroke:#4caf50
    style Block429 fill:#fff3e0,stroke:#ff9800
    style Block412 fill:#ffcdd2,stroke:#f44336
    style Block403 fill:#ffcdd2,stroke:#f44336
    style BlackList fill:#ffcdd2,stroke:#f44336
Loading

常见 IP 封禁特点

触发条件 响应码 封禁时长 解封方式
请求频率过高 429 几分钟~1小时 降低频率后自动解封
风控检测触发 403 数小时~1天 需更换IP
严重违规 403/IP拉黑 数天~永久 需更换IP

代理有效性检测器

使用 httpbin.org 等测试服务来验证代理的可用性:

import time
import httpx
from typing import Optional
from loguru import logger


class SiteProxyChecker(IProxyChecker):
    """
    通用代理检测器

    使用 httpbin.org 检测代理可用性和匿名度
    """

    # 使用 httpbin.org 检测代理IP
    CHECK_URL = "https://httpbin.org/ip"

    # 通用请求头
    HEADERS = {
        "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
                      "AppleWebKit/537.36 (KHTML, like Gecko) "
                      "Chrome/131.0.0.0 Safari/537.36",
        "Accept": "application/json"
    }

    def __init__(self, timeout: int = 10):
        self.timeout = timeout

    async def check(self, proxy: ProxyInfo) -> bool:
        """
        检测代理是否可用

        判断标准:
        - 请求成功(状态码200)
        - 响应包含有效JSON
        - 返回的IP与代理IP一致(验证代理生效)
        """
        start_time = time.time()

        try:
            async with httpx.AsyncClient(
                proxies=proxy.url,
                timeout=self.timeout,
                headers=self.HEADERS
            ) as client:
                response = await client.get(self.CHECK_URL)

                if response.status_code != 200:
                    logger.debug(f"代理状态码异常: {proxy.host}:{proxy.port} - {response.status_code}")
                    return False

                data = response.json()

                # 验证返回的IP(httpbin.org 返回 {"origin": "x.x.x.x"})
                origin_ip = data.get("origin", "")
                if not origin_ip:
                    logger.debug(f"代理响应异常: {proxy.host}:{proxy.port}")
                    return False

                # 更新响应时间
                response_time = time.time() - start_time
                proxy.avg_response_time = (
                    proxy.avg_response_time * 0.7 + response_time * 0.3
                )
                proxy.last_check_time = time.time()

                logger.debug(
                    f"代理可用: {proxy.host}:{proxy.port}, "
                    f"出口IP: {origin_ip}, 响应时间: {response_time:.2f}s"
                )
                return True

        except Exception as e:
            logger.debug(f"代理检测失败: {proxy.host}:{proxy.port} - {e}")
            return False

代理爬虫完整示例

下面展示一个完整的代理爬虫示例,使用 httpbin.org 作为测试目标:

import asyncio
import httpx
from typing import Optional, Dict, Any
from loguru import logger
from dataclasses import dataclass


@dataclass
class ProxyCrawlerConfig:
    """代理爬虫配置"""
    # 代理池配置
    min_proxies: int = 10
    max_proxies: int = 50

    # 请求配置
    request_timeout: int = 30
    max_retries: int = 3
    retry_delay: float = 1.0

    # 频率控制
    request_interval: float = 0.5  # 请求间隔(秒)


class ProxyCrawler:
    """
    代理爬虫

    特性:
    - 自动代理轮换
    - 智能重试
    - 频率控制
    - 错误处理
    """

    # 通用请求头
    DEFAULT_HEADERS = {
        "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
                      "AppleWebKit/537.36 (KHTML, like Gecko) "
                      "Chrome/131.0.0.0 Safari/537.36",
        "Accept": "application/json, text/plain, */*",
        "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
    }

    def __init__(
        self,
        proxy_pool: ProxyPool,
        config: Optional[ProxyCrawlerConfig] = None
    ):
        self.proxy_pool = proxy_pool
        self.config = config or ProxyCrawlerConfig()
        self._last_request_time = 0.0

    async def _wait_for_rate_limit(self):
        """频率控制"""
        import time
        elapsed = time.time() - self._last_request_time
        if elapsed < self.config.request_interval:
            await asyncio.sleep(self.config.request_interval - elapsed)
        self._last_request_time = time.time()

    async def _request(
        self,
        url: str,
        params: Optional[Dict] = None,
        headers: Optional[Dict] = None
    ) -> Optional[Dict[str, Any]]:
        """
        发送带代理的请求

        自动处理代理轮换和重试
        """
        await self._wait_for_rate_limit()

        merged_headers = {**self.DEFAULT_HEADERS, **(headers or {})}

        for attempt in range(self.config.max_retries):
            proxy = await self.proxy_pool.get_proxy()
            if not proxy:
                logger.warning("无可用代理,使用直连")
                proxy_url = None
            else:
                proxy_url = proxy.url

            try:
                async with httpx.AsyncClient(
                    proxies=proxy_url,
                    timeout=self.config.request_timeout,
                    headers=merged_headers
                ) as client:
                    response = await client.get(url, params=params)

                    # 处理响应
                    if response.status_code == 200:
                        if proxy:
                            await self.proxy_pool.return_proxy(proxy, success=True)
                        return response.json()

                    # HTTP错误
                    if response.status_code == 429:
                        logger.warning("请求频率过高,等待后重试")
                        if proxy:
                            await self.proxy_pool.return_proxy(proxy, success=False)
                        await asyncio.sleep(self.config.retry_delay * 2)
                        continue

                    if response.status_code in (403, 412):
                        logger.warning(f"IP被封禁 ({response.status_code}),切换代理")
                        if proxy:
                            await self.proxy_pool.return_proxy(proxy, success=False)
                        continue

                    # 其他错误
                    logger.warning(f"HTTP错误: {response.status_code}")
                    if proxy:
                        await self.proxy_pool.return_proxy(proxy, success=True)
                    return None

            except httpx.TimeoutException:
                logger.warning(f"请求超时,切换代理重试 (尝试 {attempt + 1})")
                if proxy:
                    await self.proxy_pool.return_proxy(proxy, success=False)
            except Exception as e:
                logger.error(f"请求异常: {e}")
                if proxy:
                    await self.proxy_pool.return_proxy(proxy, success=False)

        logger.error(f"请求失败,已达最大重试次数: {url}")
        return None

    async def get_with_proxy(self, url: str) -> Optional[Dict[str, Any]]:
        """
        使用代理获取URL

        Args:
            url: 目标URL

        Returns:
            响应数据
        """
        return await self._request(url)


# 使用示例
async def main():
    # 创建代理获取器
    fetcher = APIProxyFetcher(
        api_url="https://your-proxy-api.com/get",
        api_key="your_api_key",
        count=20
    )

    # 创建代理检测器
    checker = SiteProxyChecker(timeout=10)

    # 创建代理池
    pool = ProxyPool(
        fetcher=fetcher,
        checker=checker,
        min_proxies=10,
        max_proxies=50
    )

    # 启动代理池
    await pool.start()

    try:
        # 创建爬虫
        crawler = ProxyCrawler(pool)

        # 测试请求(使用 httpbin.org 验证代理生效)
        result = await crawler.get_with_proxy("https://httpbin.org/ip")
        if result:
            print(f"当前出口IP: {result.get('origin')}")

        # 测试 headers
        result = await crawler.get_with_proxy("https://httpbin.org/headers")
        if result:
            headers = result.get("headers", {})
            print(f"User-Agent: {headers.get('User-Agent', 'N/A')}")

        # 获取代理池统计
        stats = pool.get_stats()
        logger.info(f"代理池统计: {stats}")

    finally:
        await pool.stop()


if __name__ == "__main__":
    asyncio.run(main())

代理使用最佳实践

代理使用的一些通用建议:

graph LR
    subgraph 代理选择
        A1["高匿代理"] --> A2["住宅IP优先"]
        A2 --> A3["国内节点"]
    end

    subgraph 请求策略
        B1["控制频率"] --> B2["随机延迟"]
        B2 --> B3["失败轮换"]
    end

    subgraph 风控规避
        C1["完整请求头"] --> C2["Cookie携带"]
        C2 --> C3["行为模拟"]
    end

    A3 --> B1
    B3 --> C1

    style A1 fill:#e3f2fd,stroke:#2196f3
    style B1 fill:#fff3e0,stroke:#ff9800
    style C1 fill:#e8f5e9,stroke:#4caf50
Loading

关键建议

  1. 代理类型:大型网站对代理检测严格,推荐使用高匿住宅代理
  2. 请求频率:单IP建议 0.5-1 秒/请求,避免触发频率限制
  3. 完整请求头:必须携带 User-Agent、Accept 等头信息
  4. Cookie 携带:部分 API 需要登录态,代理请求也要携带 Cookie
  5. 失败处理:遇到 403/429 立即切换代理,避免 IP 被永久封禁

本章小结

本章我们学习了代理 IP 的完整知识体系:

  1. 代理基础:代理类型、匿名度、来源选择
  2. 代理池设计:获取器、检测器、分配器的接口设计
  3. 核心实现:代理获取、有效性检测、智能分配
  4. 爬虫集成:httpx 代理设置、自动轮换、隧道代理
  5. 最佳实践:代理检测、错误处理、使用建议

代理 IP 是大规模爬虫的基础设施,合理使用可以有效应对 IP 封禁。


下一章预告

下一章我们将学习「Playwright 浏览器自动化入门」。主要内容包括:

  • Playwright 的安装和基本使用
  • 页面导航和元素定位
  • 等待策略和超时处理
  • 截图和 PDF 导出
  • 爬取 JavaScript 渲染的页面

浏览器自动化是应对复杂反爬的利器,让我们一起探索!