Skip to content

Latest commit

 

History

History
2652 lines (2128 loc) · 87.6 KB

File metadata and controls

2652 lines (2128 loc) · 87.6 KB

Python标准库使用知识点

1. os和sys模块 - 系统操作基础

知识点解析

概念定义:os模块就像Python与操作系统的"翻译官",帮助Python程序与计算机系统进行交流,比如查看文件、创建文件夹、获取系统信息等。sys模块则是Python程序的"自我认知工具",可以获取程序的运行参数、Python版本等信息。

核心规则

  1. os模块用于与操作系统交互,处理文件和目录
  2. sys模块用于与Python解释器交互,处理程序参数和系统信息
  3. 使用os.path进行跨平台路径操作
  4. 使用sys.argv获取命令行参数

常见易错点

  1. 忘记处理文件或目录不存在的情况
  2. 硬编码路径分隔符导致跨平台兼容性问题
  3. 不检查命令行参数数量导致索引越界
  4. 忘记处理环境变量不存在的情况

实战案例

案例1:系统信息查看器

# 系统信息查看器
print("===系统信息查看器===")

import os
import sys
from datetime import datetime

class SystemInfoViewer:
    """系统信息查看器"""
    
    def __init__(self):
        """初始化系统信息查看器"""
        print("系统信息查看器已启动")
    
    def show_current_directory_info(self):
        """显示当前目录信息"""
        print("\n=== 当前目录信息 ===")
        
        # 获取当前工作目录
        current_dir = os.getcwd()
        print(f"当前目录: {current_dir}")
        
        # 列出当前目录内容
        try:
            items = os.listdir(current_dir)
            print(f"目录项数: {len(items)}")
            
            # 分类统计文件和目录
            files = []
            directories = []
            
            for item in items:
                item_path = os.path.join(current_dir, item)
                if os.path.isfile(item_path):
                    files.append(item)
                elif os.path.isdir(item_path):
                    directories.append(item)
            
            print(f"文件数量: {len(files)}")
            print(f"目录数量: {len(directories)}")
            
            # 显示前5个文件和目录
            if files:
                print("文件示例 (前5个):")
                for file in files[:5]:
                    print(f"  文件: {file}")
            
            if directories:
                print("目录示例 (前5个):")
                for directory in directories[:5]:
                    print(f"  目录: {directory}")
                    
        except PermissionError:
            print("没有权限访问当前目录")
        except Exception as e:
            print(f"获取目录信息时出错: {e}")
    
    def show_environment_info(self):
        """显示环境信息"""
        print("\n=== 环境信息 ===")
        
        # 显示Python版本
        print(f"Python版本: {sys.version}")
        print(f"Python可执行文件路径: {sys.executable}")
        
        # 显示平台信息
        print(f"操作系统平台: {sys.platform}")
        
        # 显示命令行参数
        print(f"命令行参数: {sys.argv}")
        
        # 显示环境变量
        print("\n重要环境变量:")
        important_vars = ["HOME", "PATH", "USER", "USERNAME", "PWD"]
        for var in important_vars:
            value = os.environ.get(var, "未设置")
            if var == "PATH":
                # PATH变量通常很长,只显示前100个字符
                value = value[:100] + "..." if len(value) > 100 else value
            print(f"  {var}: {value}")
    
    def show_system_paths(self):
        """显示系统路径"""
        print("\n=== Python模块搜索路径 ===")
        
        # 显示模块搜索路径
        print("模块搜索路径 (前10个):")
        for i, path in enumerate(sys.path[:10]):
            if path:  # 跳过空路径
                print(f"  {i+1}. {path}")
            else:
                print(f"  {i+1}. (当前目录)")
        
        # 如果路径超过10个,显示总数
        if len(sys.path) > 10:
            print(f"  ... 还有 {len(sys.path) - 10} 个路径")
    
    def create_test_structure(self):
        """创建测试目录结构"""
        print("\n=== 创建测试目录结构 ===")
        
        # 创建测试目录
        test_dirs = ["test_project", "test_project/docs", "test_project/src", "test_project/tests"]
        for directory in test_dirs:
            try:
                os.makedirs(directory, exist_ok=True)
                print(f"已创建目录: {directory}")
            except Exception as e:
                print(f"创建目录 {directory} 时出错: {e}")
        
        # 创建测试文件
        test_files = {
            "test_project/README.md": "# 测试项目\n这是一个测试项目",
            "test_project/docs/intro.txt": "项目介绍文档",
            "test_project/src/main.py": "print('Hello, World!')",
            "test_project/tests/test_main.py": "def test_example():\n    assert True"
        }
        
        for file_path, content in test_files.items():
            try:
                with open(file_path, "w", encoding="utf-8") as f:
                    f.write(content)
                print(f"已创建文件: {file_path}")
            except Exception as e:
                print(f"创建文件 {file_path} 时出错: {e}")
    
    def cleanup_test_structure(self):
        """清理测试目录结构"""
        print("\n=== 清理测试目录结构 ===")
        
        import shutil
        try:
            if os.path.exists("test_project"):
                shutil.rmtree("test_project")
                print("已清理测试目录结构")
            else:
                print("测试目录不存在")
        except Exception as e:
            print(f"清理测试目录时出错: {e}")

# 使用系统信息查看器
print("创建系统信息查看器:")
viewer = SystemInfoViewer()

# 显示当前目录信息
viewer.show_current_directory_info()

# 显示环境信息
viewer.show_environment_info()

# 显示系统路径
viewer.show_system_paths()

# 创建并清理测试结构
viewer.create_test_structure()
viewer.cleanup_test_structure()

案例2:文件目录管理工具

# 文件目录管理工具
print("\n===文件目录管理工具===")

import os
import sys
from pathlib import Path
import shutil

class FileManager:
    """文件目录管理工具"""
    
    def __init__(self):
        """初始化文件管理工具"""
        print("文件目录管理工具已启动")
    
    def create_directory(self, path):
        """
        创建目录
        
        参数:
            path (str): 目录路径
        """
        try:
            Path(path).mkdir(parents=True, exist_ok=True)
            print(f"目录已创建: {path}")
            return True
        except Exception as e:
            print(f"创建目录 {path} 失败: {e}")
            return False
    
    def remove_directory(self, path):
        """
        删除目录
        
        参数:
            path (str): 目录路径
        """
        try:
            if os.path.exists(path):
                if os.path.isdir(path):
                    shutil.rmtree(path)
                    print(f"目录已删除: {path}")
                else:
                    print(f"路径 {path} 不是目录")
            else:
                print(f"目录不存在: {path}")
            return True
        except Exception as e:
            print(f"删除目录 {path} 失败: {e}")
            return False
    
    def copy_file(self, src, dst):
        """
        复制文件
        
        参数:
            src (str): 源文件路径
            dst (str): 目标文件路径
        """
        try:
            shutil.copy2(src, dst)
            print(f"文件已复制: {src} -> {dst}")
            return True
        except Exception as e:
            print(f"复制文件 {src} -> {dst} 失败: {e}")
            return False
    
    def move_file(self, src, dst):
        """
        移动文件
        
        参数:
            src (str): 源文件路径
            dst (str): 目标文件路径
        """
        try:
            shutil.move(src, dst)
            print(f"文件已移动: {src} -> {dst}")
            return True
        except Exception as e:
            print(f"移动文件 {src} -> {dst} 失败: {e}")
            return False
    
    def list_directory_tree(self, path, level=0):
        """
        列出目录树结构
        
        参数:
            path (str): 目录路径
            level (int): 缩进级别
        """
        try:
            path_obj = Path(path)
            if not path_obj.exists():
                print(f"路径不存在: {path}")
                return
            
            indent = "  " * level
            print(f"{indent}{path_obj.name}/")
            
            if path_obj.is_dir():
                for item in path_obj.iterdir():
                    if item.is_dir():
                        self.list_directory_tree(str(item), level + 1)
                    else:
                        print(f"{'  ' * (level + 1)}{item.name}")
        except Exception as e:
            print(f"列出目录树 {path} 失败: {e}")
    
    def search_files(self, directory, pattern):
        """
        搜索文件
        
        参数:
            directory (str): 搜索目录
            pattern (str): 文件名模式
        """
        try:
            found_files = []
            directory_path = Path(directory)
            
            if not directory_path.exists():
                print(f"目录不存在: {directory}")
                return found_files
            
            # 递归搜索文件
            for file_path in directory_path.rglob(pattern):
                if file_path.is_file():
                    found_files.append(str(file_path))
            
            return found_files
        except Exception as e:
            print(f"搜索文件失败: {e}")
            return []
    
    def get_file_info(self, file_path):
        """
        获取文件信息
        
        参数:
            file_path (str): 文件路径
        """
        try:
            path_obj = Path(file_path)
            if not path_obj.exists():
                print(f"文件不存在: {file_path}")
                return None
            
            stat = path_obj.stat()
            info = {
                "路径": str(path_obj),
                "大小": f"{stat.st_size} 字节",
                "创建时间": datetime.fromtimestamp(stat.st_ctime).strftime("%Y-%m-%d %H:%M:%S"),
                "修改时间": datetime.fromtimestamp(stat.st_mtime).strftime("%Y-%m-%d %H:%M:%S"),
                "是否为文件": path_obj.is_file(),
                "是否为目录": path_obj.is_dir()
            }
            
            return info
        except Exception as e:
            print(f"获取文件信息 {file_path} 失败: {e}")
            return None

# 使用文件管理工具
print("创建文件管理工具:")
file_manager = FileManager()

# 创建测试目录结构
print("\n创建测试结构:")
file_manager.create_directory("project")
file_manager.create_directory("project/src")
file_manager.create_directory("project/docs")
file_manager.create_directory("project/tests")

# 创建测试文件
test_files = {
    "project/README.md": "# 项目说明\n这是项目说明文件",
    "project/src/main.py": "def main():\n    print('Hello, World!')\n\nif __name__ == '__main__':\n    main()",
    "project/src/utils.py": "def utility_function():\n    return '这是一个工具函数'",
    "project/docs/guide.md": "# 使用指南\n详细使用说明",
    "project/tests/test_main.py": "def test_main():\n    assert True"
}

for file_path, content in test_files.items():
    try:
        with open(file_path, "w", encoding="utf-8") as f:
            f.write(content)
        print(f"已创建文件: {file_path}")
    except Exception as e:
        print(f"创建文件 {file_path} 失败: {e}")

# 显示目录树
print("\n项目目录树:")
file_manager.list_directory_tree("project")

# 搜索文件
print("\n搜索Python文件:")
py_files = file_manager.search_files("project", "*.py")
for file in py_files:
    print(f"  找到: {file}")

# 获取文件信息
print("\n文件信息:")
info = file_manager.get_file_info("project/src/main.py")
if info:
    for key, value in info.items():
        print(f"  {key}: {value}")

# 复制和移动文件
print("\n文件操作:")
file_manager.copy_file("project/README.md", "project/README_COPY.md")
file_manager.move_file("project/docs/guide.md", "project/GUIDE.md")

# 显示更新后的目录树
print("\n更新后的目录树:")
file_manager.list_directory_tree("project")

# 清理测试文件
import shutil
shutil.rmtree("project", ignore_errors=True)

代码说明

案例1代码解释

  1. os.getcwd():获取当前工作目录
  2. os.listdir(current_dir):列出目录中的所有项目
  3. sys.version:获取Python版本信息
  4. os.environ.get(var, "未设置"):安全地获取环境变量

如果直接访问os.environ[var]而变量不存在,会抛出KeyError异常,使用get方法可以避免这个问题。

案例2代码解释

  1. Path(path).mkdir(parents=True, exist_ok=True):创建目录,自动创建父目录
  2. shutil.rmtree(path):递归删除目录及其内容
  3. path_obj.iterdir():遍历目录中的项目
  4. path_obj.rglob(pattern):递归搜索匹配模式的文件

如果忘记使用parents=True参数,当父目录不存在时会抛出FileNotFoundError异常。

2. datetime和time模块 - 时间日期处理

知识点解析

概念定义:datetime模块就像一个"时间魔法师",可以帮我们处理各种与时间相关的问题,比如获取当前时间、计算时间差、格式化时间显示等。time模块则更底层,提供了时间访问和转换的基本功能。

核心规则

  1. datetime模块提供日期时间的高级操作
  2. time模块提供底层时间功能
  3. 使用strftime()格式化时间,strptime()解析时间字符串
  4. timedelta用于时间计算

常见易错点

  1. 混淆datetime对象和字符串表示
  2. 时区处理不当导致时间错误
  3. 忘记处理时间解析异常
  4. 时间计算时忽略闰秒等特殊情况

实战案例

案例1:个人时间管理器

# 个人时间管理器
print("===个人时间管理器===")

from datetime import datetime, date, time, timedelta
import time as time_module  # 为了避免与datetime.time冲突

class TimeManager:
    """个人时间管理器"""
    
    def __init__(self):
        """初始化时间管理器"""
        self.events = []
        print("个人时间管理器已启动")
    
    def add_event(self, name, event_datetime, duration_hours=1):
        """
        添加事件
        
        参数:
            name (str): 事件名称
            event_datetime (datetime): 事件时间
            duration_hours (int): 持续时间(小时)
        """
        event = {
            "name": name,
            "datetime": event_datetime,
            "duration": timedelta(hours=duration_hours),
            "created_at": datetime.now()
        }
        
        self.events.append(event)
        print(f"已添加事件: {name}{event_datetime.strftime('%Y-%m-%d %H:%M')}")
    
    def show_upcoming_events(self, days=7):
        """
        显示即将到来的事件
        
        参数:
            days (int): 显示未来几天的事件
        """
        print(f"\n=== 未来 {days} 天的事件 ===")
        
        now = datetime.now()
        future_limit = now + timedelta(days=days)
        
        upcoming_events = [
            event for event in self.events
            if now <= event["datetime"] <= future_limit
        ]
        
        if not upcoming_events:
            print("暂无即将到来的事件")
            return
        
        # 按时间排序
        upcoming_events.sort(key=lambda x: x["datetime"])
        
        for event in upcoming_events:
            time_str = event["datetime"].strftime("%Y-%m-%d %H:%M")
            print(f"  {time_str}: {event['name']}")
    
    def show_events_on_date(self, target_date):
        """
        显示指定日期的事件
        
        参数:
            target_date (date): 目标日期
        """
        print(f"\n=== {target_date.strftime('%Y年%m月%d日')} 的事件 ===")
        
        date_events = [
            event for event in self.events
            if event["datetime"].date() == target_date
        ]
        
        if not date_events:
            print("当天没有事件安排")
            return
        
        # 按时间排序
        date_events.sort(key=lambda x: x["datetime"])
        
        for event in date_events:
            time_str = event["datetime"].strftime("%H:%M")
            end_time = event["datetime"] + event["duration"]
            end_time_str = end_time.strftime("%H:%M")
            print(f"  {time_str}-{end_time_str}: {event['name']}")
    
    def calculate_time_until_event(self, event_name):
        """
        计算距离事件的时间
        
        参数:
            event_name (str): 事件名称
        """
        print(f"\n=== 距离 '{event_name}' 的时间 ===")
        
        for event in self.events:
            if event["name"] == event_name:
                now = datetime.now()
                if event["datetime"] > now:
                    time_diff = event["datetime"] - now
                    days = time_diff.days
                    hours, remainder = divmod(time_diff.seconds, 3600)
                    minutes, _ = divmod(remainder, 60)
                    
                    print(f"距离 {event_name} 还有 {days}{hours} 小时 {minutes} 分钟")
                elif event["datetime"] < now:
                    time_diff = now - event["datetime"]
                    days = time_diff.days
                    print(f"{event_name} 已经过了 {days} 天")
                else:
                    print(f"{event_name} 正在进行中")
                return
        
        print(f"未找到事件: {event_name}")
    
    def show_current_time_info(self):
        """显示当前时间信息"""
        print("\n=== 当前时间信息 ===")
        
        now = datetime.now()
        print(f"当前时间: {now.strftime('%Y-%m-%d %H:%M:%S')}")
        print(f"星期: {now.strftime('%A')}")
        print(f"月份: {now.strftime('%B')}")
        print(f"一年中的第 {now.timetuple().tm_yday} 天")
        print(f"一周中的第 {now.weekday() + 1} 天")
        
        # 显示时间戳
        timestamp = time_module.time()
        print(f"时间戳: {timestamp}")
    
    def format_time_examples(self):
        """显示时间格式化示例"""
        print("\n=== 时间格式化示例 ===")
        
        sample_time = datetime(2023, 12, 25, 14, 30, 45)
        formats = [
            ("%Y-%m-%d %H:%M:%S", "标准格式"),
            ("%Y年%m月%d日 %H时%M分%S秒", "中文格式"),
            ("%A, %B %d, %Y", "英文格式"),
            ("%Y-%m-%d", "日期格式"),
            ("%H:%M:%S", "时间格式"),
            ("%Y-%m-%d %I:%M %p", "12小时制格式")
        ]
        
        for format_str, description in formats:
            formatted = sample_time.strftime(format_str)
            print(f"  {description}: {formatted}")
    
    def parse_time_examples(self):
        """显示时间解析示例"""
        print("\n=== 时间解析示例 ===")
        
        time_strings = [
            ("2023-12-25 14:30:45", "%Y-%m-%d %H:%M:%S"),
            ("25/12/2023", "%d/%m/%Y"),
            ("Dec 25, 2023", "%b %d, %Y"),
            ("2023-12-25", "%Y-%m-%d")
        ]
        
        for time_str, format_str in time_strings:
            try:
                parsed = datetime.strptime(time_str, format_str)
                print(f"  '{time_str}' -> {parsed}")
            except ValueError as e:
                print(f"  '{time_str}' 解析失败: {e}")

# 使用时间管理器
print("创建时间管理器:")
time_manager = TimeManager()

# 显示当前时间信息
time_manager.show_current_time_info()

# 显示格式化示例
time_manager.format_time_examples()

# 显示解析示例
time_manager.parse_time_examples()

# 添加示例事件
now = datetime.now()
time_manager.add_event("团队会议", now + timedelta(days=1, hours=2), 2)
time_manager.add_event("项目截止", now + timedelta(days=3), 1)
time_manager.add_event("生日聚会", now + timedelta(days=7, hours=18), 3)
time_manager.add_event("年度总结", date(2024, 1, 15), 2)

# 显示即将到来的事件
time_manager.show_upcoming_events(10)

# 显示指定日期的事件
tomorrow = (now + timedelta(days=1)).date()
time_manager.show_events_on_date(tomorrow)

# 计算距离事件的时间
time_manager.calculate_time_until_event("团队会议")

案例2:工作时间计算器

# 工作时间计算器
print("\n===工作时间计算器===")

from datetime import datetime, timedelta, time
import json

class WorkTimeCalculator:
    """工作时间计算器"""
    
    def __init__(self):
        """初始化工作时间计算器"""
        self.work_records = []
        print("工作时间计算器已启动")
    
    def start_work(self, task_name):
        """
        开始工作
        
        参数:
            task_name (str): 任务名称
        """
        start_time = datetime.now()
        record = {
            "task_name": task_name,
            "start_time": start_time,
            "end_time": None,
            "duration": None
        }
        
        self.work_records.append(record)
        print(f"开始工作: {task_name} (开始时间: {start_time.strftime('%H:%M:%S')})")
    
    def end_work(self):
        """结束当前工作"""
        if not self.work_records:
            print("没有正在进行的工作")
            return
        
        # 找到最后一个未结束的工作
        current_work = None
        for record in reversed(self.work_records):
            if record["end_time"] is None:
                current_work = record
                break
        
        if current_work is None:
            print("没有正在进行的工作")
            return
        
        end_time = datetime.now()
        current_work["end_time"] = end_time
        current_work["duration"] = end_time - current_work["start_time"]
        
        duration_str = str(current_work["duration"]).split('.')[0]  # 移除微秒部分
        print(f"结束工作: {current_work['task_name']} (结束时间: {end_time.strftime('%H:%M:%S')}, 持续时间: {duration_str})")
    
    def get_daily_summary(self, target_date=None):
        """
        获取每日工作摘要
        
        参数:
            target_date (date): 目标日期,默认为今天
        """
        if target_date is None:
            target_date = datetime.now().date()
        
        print(f"\n=== {target_date.strftime('%Y年%m月%d日')} 工作摘要 ===")
        
        # 筛选指定日期的工作记录
        daily_records = [
            record for record in self.work_records
            if record["start_time"].date() == target_date and record["duration"] is not None
        ]
        
        if not daily_records:
            print("当天没有完成的工作记录")
            return
        
        total_duration = timedelta()
        print("工作记录:")
        for record in daily_records:
            duration_str = str(record["duration"]).split('.')[0]
            start_str = record["start_time"].strftime('%H:%M')
            end_str = record["end_time"].strftime('%H:%M')
            print(f"  {record['task_name']}: {start_str}-{end_str} ({duration_str})")
            total_duration += record["duration"]
        
        total_hours = total_duration.total_seconds() / 3600
        print(f"\n总工作时间: {str(total_duration).split('.')[0]} ({total_hours:.2f} 小时)")
    
    def get_weekly_summary(self, weeks_ago=0):
        """
        获取周工作摘要
        
        参数:
            weeks_ago (int): 几周前,默认为本周
        """
        today = datetime.now().date()
        # 计算周一日期
        monday = today - timedelta(days=today.weekday()) - timedelta(weeks=weeks_ago)
        sunday = monday + timedelta(days=6)
        
        print(f"\n=== {monday.strftime('%Y年%m月%d日')}-{sunday.strftime('%m月%d日')} 周工作摘要 ===")
        
        # 筛选本周的工作记录
        weekly_records = [
            record for record in self.work_records
            if monday <= record["start_time"].date() <= sunday and record["duration"] is not None
        ]
        
        if not weekly_records:
            print("本周没有完成的工作记录")
            return
        
        # 按日期分组
        daily_totals = {}
        task_totals = {}
        
        for record in weekly_records:
            record_date = record["start_time"].date()
            if record_date not in daily_totals:
                daily_totals[record_date] = timedelta()
            daily_totals[record_date] += record["duration"]
            
            task_name = record["task_name"]
            if task_name not in task_totals:
                task_totals[task_name] = timedelta()
            task_totals[task_name] += record["duration"]
        
        # 显示每日总计
        print("每日工作时间:")
        for date_key in sorted(daily_totals.keys()):
            duration = daily_totals[date_key]
            hours = duration.total_seconds() / 3600
            print(f"  {date_key.strftime('%m月%d日')}: {str(duration).split('.')[0]} ({hours:.2f} 小时)")
        
        # 显示任务总计
        print("\n任务时间分布:")
        sorted_tasks = sorted(task_totals.items(), key=lambda x: x[1], reverse=True)
        total_week_duration = sum(task_totals.values(), timedelta())
        total_week_hours = total_week_duration.total_seconds() / 3600
        
        for task_name, duration in sorted_tasks:
            hours = duration.total_seconds() / 3600
            percentage = (hours / total_week_hours) * 100 if total_week_hours > 0 else 0
            print(f"  {task_name}: {str(duration).split('.')[0]} ({hours:.2f} 小时, {percentage:.1f}%)")
        
        print(f"\n本周总工作时间: {str(total_week_duration).split('.')[0]} ({total_week_hours:.2f} 小时)")
    
    def export_records(self, filename):
        """
        导出工作记录
        
        参数:
            filename (str): 导出文件名
        """
        export_data = []
        for record in self.work_records:
            export_record = {
                "task_name": record["task_name"],
                "start_time": record["start_time"].isoformat() if record["start_time"] else None,
                "end_time": record["end_time"].isoformat() if record["end_time"] else None,
                "duration_seconds": record["duration"].total_seconds() if record["duration"] else None
            }
            export_data.append(export_record)
        
        try:
            with open(filename, "w", encoding="utf-8") as f:
                json.dump(export_data, f, ensure_ascii=False, indent=2)
            print(f"工作记录已导出到: {filename}")
        except Exception as e:
            print(f"导出记录失败: {e}")
    
    def import_records(self, filename):
        """
        导入工作记录
        
        参数:
            filename (str): 导入文件名
        """
        try:
            with open(filename, "r", encoding="utf-8") as f:
                import_data = json.load(f)
            
            self.work_records = []
            for record_data in import_data:
                record = {
                    "task_name": record_data["task_name"],
                    "start_time": datetime.fromisoformat(record_data["start_time"]) if record_data["start_time"] else None,
                    "end_time": datetime.fromisoformat(record_data["end_time"]) if record_data["end_time"] else None,
                    "duration": timedelta(seconds=record_data["duration_seconds"]) if record_data["duration_seconds"] else None
                }
                self.work_records.append(record)
            
            print(f"工作记录已从 {filename} 导入")
        except Exception as e:
            print(f"导入记录失败: {e}")

# 使用工作时间计算器
print("创建工作时间计算器:")
work_calculator = WorkTimeCalculator()

# 模拟工作记录
print("\n模拟工作记录:")
work_calculator.start_work("编写代码")
# 模拟工作1小时
import time
time.sleep(1)  # 实际使用中这里会是实际工作时间
work_calculator.end_work()

work_calculator.start_work("代码审查")
time.sleep(1)
work_calculator.end_work()

work_calculator.start_work("会议")
time.sleep(1)
work_calculator.end_work()

# 获取每日摘要
work_calculator.get_daily_summary()

# 获取周摘要
work_calculator.get_weekly_summary()

# 导出记录
work_calculator.export_records("work_records.json")

# 创建新的计算器并导入记录
print("\n导入记录测试:")
new_calculator = WorkTimeCalculator()
new_calculator.import_records("work_records.json")
new_calculator.get_daily_summary()

# 清理文件
import os
if os.path.exists("work_records.json"):
    os.remove("work_records.json")

代码说明

案例1代码解释

  1. datetime.now():获取当前日期时间
  2. timedelta(days=1, hours=2):创建时间差对象
  3. strftime('%Y-%m-%d %H:%M:%S'):格式化时间显示
  4. strptime(time_str, format_str):解析时间字符串

如果时间字符串格式与指定格式不匹配,strptime会抛出ValueError异常。

案例2代码解释

  1. datetime.fromisoformat(record_data["start_time"]):解析ISO格式的时间字符串
  2. duration.total_seconds():将时间差转换为秒数
  3. timedelta(seconds=record_data["duration_seconds"]):从秒数创建时间差对象
  4. divmod(time_diff.seconds, 3600):计算小时和剩余秒数

如果忘记处理未结束的工作记录,在计算时间时会出现NoneType错误。

3. math和random模块 - 数学计算和随机数

知识点解析

概念定义:math模块就像一个"科学计算器",提供了各种数学函数,如三角函数、对数函数、幂函数等。random模块则像一个"随机数生成器",可以生成各种随机数,用于模拟、游戏或加密等场景。

核心规则

  1. math模块提供数学常数和函数
  2. random模块提供随机数生成功能
  3. 使用math.radians()和math.degrees()进行角度转换
  4. 使用random.seed()设置随机种子以获得可重现的结果

常见易错点

  1. 忘记角度和弧度的转换导致三角函数计算错误
  2. 对负数开平方或对数导致数学域错误
  3. 不设置随机种子导致结果无法重现
  4. 误用random.randint()和random.randrange()的参数

实战案例

案例1:科学计算器

# 科学计算器
print("===科学计算器===")

import math
import random

class ScientificCalculator:
    """科学计算器"""
    
    def __init__(self):
        """初始化科学计算器"""
        print("科学计算器已启动")
    
    def basic_operations(self, a, b):
        """
        基本运算
        
        参数:
            a (float): 第一个数
            b (float): 第二个数
        """
        print(f"\n=== 基本运算 ({a}, {b}) ===")
        print(f"加法: {a} + {b} = {a + b}")
        print(f"减法: {a} - {b} = {a - b}")
        print(f"乘法: {a} * {b} = {a * b}")
        
        if b != 0:
            print(f"除法: {a} / {b} = {a / b}")
            print(f"取余: {a} % {b} = {a % b}")
            print(f"整除: {a} // {b} = {a // b}")
        else:
            print("除法: 除数不能为零")
        
        print(f"幂运算: {a} ** {b} = {a ** b}")
    
    def math_functions(self, x):
        """
        数学函数
        
        参数:
            x (float): 输入值
        """
        print(f"\n=== 数学函数 (x = {x}) ===")
        
        # 基本函数
        print(f"绝对值: abs({x}) = {abs(x)}")
        print(f"向上取整: math.ceil({x}) = {math.ceil(x)}")
        print(f"向下取整: math.floor({x}) = {math.floor(x)}")
        
        # 幂和对数函数
        if x > 0:
            print(f"平方根: math.sqrt({x}) = {math.sqrt(x):.6f}")
            print(f"自然对数: math.log({x}) = {math.log(x):.6f}")
            print(f"以10为底的对数: math.log10({x}) = {math.log10(x):.6f}")
        elif x == 0:
            print("平方根: 0")
            print("自然对数: 负无穷")
            print("以10为底的对数: 负无穷")
        else:
            print("平方根: 未定义(负数)")
            print("自然对数: 未定义(负数)")
            print("以10为底的对数: 未定义(负数)")
        
        print(f"e的x次方: math.exp({x}) = {math.exp(x):.6f}")
        print(f"x的平方: math.pow({x}, 2) = {math.pow(x, 2):.6f}")
        
        # 三角函数(需要将角度转换为弧度)
        rad = math.radians(x)
        print(f"角度转弧度: math.radians({x}) = {rad:.6f}")
        print(f"sin({x}°) = {math.sin(rad):.6f}")
        print(f"cos({x}°) = {math.cos(rad):.6f}")
        print(f"tan({x}°) = {math.tan(rad):.6f}")
        
        # 反三角函数
        if -1 <= x <= 1:
            print(f"arcsin({x}) = {math.degrees(math.asin(x)):.2f}°")
            print(f"arccos({x}) = {math.degrees(math.acos(x)):.2f}°")
        print(f"arctan({x}) = {math.degrees(math.atan(x)):.2f}°")
    
    def constants_and_advanced_functions(self):
        """常数和高级函数"""
        print(f"\n=== 数学常数 ===")
        print(f"圆周率 π: {math.pi:.10f}")
        print(f"自然常数 e: {math.e:.10f}")
        print(f"2π (τ): {math.tau:.10f}")
        print(f"正无穷: {math.inf}")
        print(f"负无穷: {-math.inf}")
        print(f"非数字: {math.nan}")
        
        print(f"\n=== 高级函数 ===")
        print(f"最大公约数 gcd(48, 18): {math.gcd(48, 18)}")
        print(f"阶乘 5!: {math.factorial(5)}")
        
        # 误差函数
        print(f"误差函数 erf(1): {math.erf(1):.6f}")
        
        # 伽马函数
        print(f"伽马函数 gamma(5): {math.gamma(5):.6f}")
        
        # 双曲函数
        print(f"双曲正弦 sinh(1): {math.sinh(1):.6f}")
        print(f"双曲余弦 cosh(1): {math.cosh(1):.6f}")
        print(f"双曲正切 tanh(1): {math.tanh(1):.6f}")
    
    def statistics_functions(self, numbers):
        """
        统计函数
        
        参数:
            numbers (list): 数字列表
        """
        if not numbers:
            print("数字列表为空")
            return
        
        print(f"\n=== 统计函数 ({numbers}) ===")
        print(f"求和: {sum(numbers)}")
        print(f"平均值: {sum(numbers) / len(numbers):.2f}")
        print(f"最大值: {max(numbers)}")
        print(f"最小值: {min(numbers)}")
        
        # 方差和标准差
        mean = sum(numbers) / len(numbers)
        variance = sum((x - mean) ** 2 for x in numbers) / len(numbers)
        std_dev = math.sqrt(variance)
        print(f"方差: {variance:.2f}")
        print(f"标准差: {std_dev:.2f}")
        
        # 中位数
        sorted_numbers = sorted(numbers)
        n = len(sorted_numbers)
        if n % 2 == 0:
            median = (sorted_numbers[n//2 - 1] + sorted_numbers[n//2]) / 2
        else:
            median = sorted_numbers[n//2]
        print(f"中位数: {median}")

# 使用科学计算器
print("创建科学计算器:")
calculator = ScientificCalculator()

# 基本运算
calculator.basic_operations(10, 3)

# 数学函数
calculator.math_functions(30)

# 常数和高级函数
calculator.constants_and_advanced_functions()

# 统计函数
calculator.statistics_functions([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])

案例2:概率统计模拟器

# 概率统计模拟器
print("\n===概率统计模拟器===")

import random
import math
from collections import Counter

class ProbabilitySimulator:
    """概率统计模拟器"""
    
    def __init__(self):
        """初始化概率统计模拟器"""
        print("概率统计模拟器已启动")
    
    def coin_flip_simulation(self, num_flips=1000):
        """
        抛硬币模拟
        
        参数:
            num_flips (int): 抛硬币次数
        """
        print(f"\n=== 抛硬币模拟 ({num_flips} 次) ===")
        
        # 设置随机种子以获得可重现的结果
        random.seed(42)
        
        results = []
        for _ in range(num_flips):
            # 0表示反面,1表示正面
            flip = random.randint(0, 1)
            results.append("正面" if flip == 1 else "反面")
        
        # 统计结果
        counter = Counter(results)
        heads_count = counter["正面"]
        tails_count = counter["反面"]
        
        print(f"正面次数: {heads_count} ({heads_count/num_flips*100:.1f}%)")
        print(f"反面次数: {tails_count} ({tails_count/num_flips*100:.1f}%)")
        
        # 计算与理论值的偏差
        expected = num_flips / 2
        heads_deviation = abs(heads_count - expected) / expected * 100
        print(f"正面偏差: {heads_deviation:.1f}%")
    
    def dice_roll_simulation(self, num_rolls=1000, num_dice=2):
        """
        骰子投掷模拟
        
        参数:
            num_rolls (int): 投掷次数
            num_dice (int): 骰子数量
        """
        print(f"\n=== {num_dice}个骰子投掷模拟 ({num_rolls} 次) ===")
        
        random.seed(42)
        
        results = []
        for _ in range(num_rolls):
            # 投掷多个骰子并求和
            roll_sum = sum(random.randint(1, 6) for _ in range(num_dice))
            results.append(roll_sum)
        
        # 统计结果
        counter = Counter(results)
        total_possible_outcomes = 6 ** num_dice
        
        print("点数分布:")
        for point in sorted(counter.keys()):
            count = counter[point]
            probability = count / num_rolls * 100
            theoretical_prob = self._theoretical_dice_probability(point, num_dice) * 100
            print(f"  {point}点: {count}次 ({probability:.1f}%) [理论: {theoretical_prob:.1f}%]")
    
    def _theoretical_dice_probability(self, target_sum, num_dice):
        """
        计算骰子点数和的理论概率
        
        参数:
            target_sum (int): 目标点数和
            num_dice (int): 骰子数量
            
        返回:
            float: 理论概率
        """
        # 这是一个简化的计算方法,适用于少量骰子
        if num_dice == 1:
            return 1/6 if 1 <= target_sum <= 6 else 0
        elif num_dice == 2:
            # 对于两个骰子,点数和的概率分布是已知的
            if 2 <= target_sum <= 7:
                return (target_sum - 1) / 36
            elif 8 <= target_sum <= 12:
                return (13 - target_sum) / 36
            else:
                return 0
        else:
            # 对于更多骰子,使用近似计算
            return 1 / (6 ** num_dice)  # 简化处理
    
    def normal_distribution_simulation(self, sample_size=10000):
        """
        正态分布模拟(使用中心极限定理)
        
        参数:
            sample_size (int): 样本大小
        """
        print(f"\n=== 正态分布模拟 ({sample_size} 个样本) ===")
        
        random.seed(42)
        
        # 使用中心极限定理生成近似正态分布的数据
        # 通过求多个均匀分布随机数的平均值来近似正态分布
        samples = []
        for _ in range(sample_size):
            # 生成12个均匀分布的随机数并求和,减去6得到近似标准正态分布
            uniform_sum = sum(random.random() for _ in range(12))
            standard_normal = uniform_sum - 6
            samples.append(standard_normal)
        
        # 计算统计信息
        mean = sum(samples) / len(samples)
        variance = sum((x - mean) ** 2 for x in samples) / len(samples)
        std_dev = math.sqrt(variance)
        
        print(f"样本均值: {mean:.4f} (理论值: 0)")
        print(f"样本标准差: {std_dev:.4f} (理论值: 1)")
        
        # 分组统计
        print("\n分布情况:")
        min_val = min(samples)
        max_val = max(samples)
        interval = (max_val - min_val) / 10
        
        for i in range(10):
            lower = min_val + i * interval
            upper = min_val + (i + 1) * interval
            count = sum(1 for x in samples if lower <= x < upper)
            if i == 9:  # 最后一个区间包含上界
                count = sum(1 for x in samples if lower <= x <= upper)
            
            percentage = count / len(samples) * 100
            print(f"  [{lower:5.2f}, {upper:5.2f}): {count:4d} ({percentage:4.1f}%)")
    
    def random_sampling(self, population, sample_size):
        """
        随机抽样
        
        参数:
            population (list): 总体
            sample_size (int): 样本大小
        """
        print(f"\n=== 随机抽样 ===")
        print(f"总体大小: {len(population)}")
        print(f"样本大小: {sample_size}")
        
        if sample_size > len(population):
            print("样本大小不能超过总体大小")
            return
        
        # 不重复抽样
        sample_without_replacement = random.sample(population, sample_size)
        print(f"不重复抽样: {sample_without_replacement}")
        
        # 重复抽样
        sample_with_replacement = [random.choice(population) for _ in range(sample_size)]
        print(f"重复抽样: {sample_with_replacement}")
        
        # 随机打乱
        shuffled_population = population.copy()
        random.shuffle(shuffled_population)
        print(f"随机打乱前5个: {shuffled_population[:5]}")
    
    def monte_carlo_pi_estimation(self, num_points=1000000):
        """
        蒙特卡洛方法估算π值
        
        参数:
            num_points (int): 随机点数量
        """
        print(f"\n=== 蒙特卡洛估算π值 ({num_points} 个点) ===")
        
        random.seed(42)
        
        points_inside_circle = 0
        
        for _ in range(num_points):
            # 在单位正方形内生成随机点
            x = random.uniform(-1, 1)
            y = random.uniform(-1, 1)
            
            # 检查点是否在单位圆内
            if x**2 + y**2 <= 1:
                points_inside_circle += 1
        
        # 根据蒙特卡洛方法,π ≈ 4 * (圆内点数 / 总点数)
        estimated_pi = 4 * points_inside_circle / num_points
        error = abs(estimated_pi - math.pi) / math.pi * 100
        
        print(f"估算的π值: {estimated_pi:.6f}")
        print(f"真实π值: {math.pi:.6f}")
        print(f"误差: {error:.4f}%")

# 使用概率统计模拟器
print("创建概率统计模拟器:")
simulator = ProbabilitySimulator()

# 抛硬币模拟
simulator.coin_flip_simulation(1000)

# 骰子投掷模拟
simulator.dice_roll_simulation(1000, 2)

# 正态分布模拟
simulator.normal_distribution_simulation(10000)

# 随机抽样
population = list(range(1, 101))  # 1到100的数字
simulator.random_sampling(population, 10)

# 蒙特卡洛估算π值
simulator.monte_carlo_pi_estimation(100000)

代码说明

案例1代码解释

  1. math.sqrt(x):计算平方根
  2. math.sin(math.radians(x)):先将角度转换为弧度再计算正弦值
  3. math.factorial(5):计算阶乘
  4. sum((x - mean) ** 2 for x in numbers) / len(numbers):计算方差

如果对负数调用math.sqrt(),会抛出ValueError异常。

案例2代码解释

  1. random.randint(0, 1):生成0或1的随机整数
  2. random.sample(population, sample_size):不重复随机抽样
  3. random.uniform(-1, 1):生成指定范围内的随机浮点数
  4. random.seed(42):设置随机种子以获得可重现的结果

如果sample_size大于总体大小,random.sample()会抛出ValueError异常。

4. json和re模块 - 数据处理和文本匹配

知识点解析

概念定义:json模块就像一个"数据翻译官",可以将Python对象转换为JSON格式的文本,也可以将JSON文本转换为Python对象,便于数据存储和传输。re模块则像一个"文本侦探",可以使用正则表达式在文本中查找、替换特定模式的内容。

核心规则

  1. json模块用于JSON数据的序列化和反序列化
  2. re模块用于正则表达式匹配和文本处理
  3. 使用json.dumps()和json.loads()处理字符串
  4. 使用json.dump()和json.load()处理文件

常见易错点

  1. 忘记处理JSON解析异常
  2. 正则表达式写错导致匹配不到预期内容
  3. 不使用原始字符串(r"")导致转义字符问题
  4. 忘记指定编码格式导致中文乱码

实战案例

案例1:数据序列化管理器

# 数据序列化管理器
print("===数据序列化管理器===")

import json
import re
from datetime import datetime

class DataSerializationManager:
    """数据序列化管理器"""
    
    def __init__(self):
        """初始化数据序列化管理器"""
        print("数据序列化管理器已启动")
    
    def serialize_to_json(self, data, filename=None):
        """
        序列化数据到JSON
        
        参数:
            data: 要序列化的数据
            filename (str): 保存文件名,如果为None则返回字符串
            
        返回:
            str or bool: JSON字符串或保存成功与否
        """
        try:
            if filename:
                with open(filename, "w", encoding="utf-8") as f:
                    json.dump(data, f, ensure_ascii=False, indent=2)
                print(f"数据已保存到: {filename}")
                return True
            else:
                json_string = json.dumps(data, ensure_ascii=False, indent=2)
                return json_string
        except Exception as e:
            print(f"序列化数据失败: {e}")
            return False if filename else None
    
    def deserialize_from_json(self, source):
        """
        从JSON反序列化数据
        
        参数:
            source (str): JSON字符串或文件名
            
        返回:
            object: 反序列化的数据
        """
        try:
            # 判断是文件名还是JSON字符串
            if source.endswith('.json') and len(source) > 5:
                # 假设是文件名
                with open(source, "r", encoding="utf-8") as f:
                    data = json.load(f)
                print(f"数据已从 {source} 加载")
            else:
                # 假设是JSON字符串
                data = json.loads(source)
            return data
        except FileNotFoundError:
            print(f"文件未找到: {source}")
            return None
        except json.JSONDecodeError as e:
            print(f"JSON解析错误: {e}")
            return None
        except Exception as e:
            print(f"反序列化数据失败: {e}")
            return None
    
    def create_sample_data(self):
        """创建示例数据"""
        return {
            "用户信息": {
                "姓名": "张三",
                "年龄": 28,
                "邮箱": "zhangsan@example.com",
                "电话": "13812345678",
                "地址": {
                    "省份": "北京市",
                    "城市": "北京市",
                    "详细地址": "朝阳区某某街道123号"
                },
                "兴趣爱好": ["阅读", "游泳", "编程"]
            },
            "订单信息": [
                {
                    "订单号": "ORD202312001",
                    "商品": "笔记本电脑",
                    "价格": 5999.00,
                    "数量": 1,
                    "状态": "已发货"
                },
                {
                    "订单号": "ORD202312002",
                    "商品": "无线鼠标",
                    "价格": 99.00,
                    "数量": 2,
                    "状态": "已付款"
                }
            ],
            "系统信息": {
                "创建时间": datetime.now().isoformat(),
                "版本": "1.0.0",
                "启用": True
            }
        }
    
    def validate_json_format(self, json_string):
        """
        验证JSON格式
        
        参数:
            json_string (str): JSON字符串
            
        返回:
            bool: 格式是否正确
        """
        try:
            json.loads(json_string)
            return True
        except json.JSONDecodeError:
            return False
    
    def format_json_string(self, json_string):
        """
        格式化JSON字符串
        
        参数:
            json_string (str): JSON字符串
            
        返回:
            str: 格式化后的JSON字符串
        """
        try:
            data = json.loads(json_string)
            return json.dumps(data, ensure_ascii=False, indent=2)
        except json.JSONDecodeError as e:
            print(f"JSON格式错误: {e}")
            return None

# 使用数据序列化管理器
print("创建数据序列化管理器:")
manager = DataSerializationManager()

# 创建示例数据
sample_data = manager.create_sample_data()
print("创建示例数据完成")

# 序列化到JSON字符串
print("\n序列化到JSON字符串:")
json_string = manager.serialize_to_json(sample_data)
if json_string:
    print("序列化成功:")
    print(json_string[:200] + "..." if len(json_string) > 200 else json_string)

# 保存到文件
print("\n保存到文件:")
manager.serialize_to_json(sample_data, "sample_data.json")

# 从文件加载
print("\n从文件加载:")
loaded_data = manager.deserialize_from_json("sample_data.json")
if loaded_data:
    print("加载成功,用户姓名:", loaded_data["用户信息"]["姓名"])

# 验证JSON格式
print("\n验证JSON格式:")
valid_json = '{"姓名": "李四", "年龄": 25}'
invalid_json = '{"姓名": "李四", "年龄": 25'  # 缺少闭合括号

print(f"有效JSON: {manager.validate_json_format(valid_json)}")
print(f"无效JSON: {manager.validate_json_format(invalid_json)}")

# 格式化JSON字符串
print("\n格式化JSON字符串:")
messy_json = '{"姓名":"王五","年龄":30,"城市":"上海"}'
formatted_json = manager.format_json_string(messy_json)
if formatted_json:
    print("格式化后:")
    print(formatted_json)

# 清理文件
import os
if os.path.exists("sample_data.json"):
    os.remove("sample_data.json")

案例2:文本处理和信息提取器

# 文本处理和信息提取器
print("\n===文本处理和信息提取器===")

import re
import json

class TextProcessor:
    """文本处理和信息提取器"""
    
    def __init__(self):
        """初始化文本处理器"""
        print("文本处理和信息提取器已启动")
    
    def extract_emails(self, text):
        """
        提取邮箱地址
        
        参数:
            text (str): 文本内容
            
        返回:
            list: 邮箱地址列表
        """
        # 邮箱正则表达式
        email_pattern = r'\b[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}\b'
        emails = re.findall(email_pattern, text)
        return emails
    
    def extract_phone_numbers(self, text):
        """
        提取电话号码
        
        参数:
            text (str): 文本内容
            
        返回:
            list: 电话号码列表
        """
        # 中国手机号正则表达式
        phone_pattern = r'\b1[3-9]\d{9}\b'
        phones = re.findall(phone_pattern, text)
        return phones
    
    def extract_urls(self, text):
        """
        提取URL链接
        
        参数:
            text (str): 文本内容
            
        返回:
            list: URL列表
        """
        # URL正则表达式
        url_pattern = r'https?://(?:[-\w.])+(?:[:\d]+)?(?:/(?:[\w/_.])*(?:\?(?:[\w&=%.])*)?(?:#(?:[\w.])*)?)?'
        urls = re.findall(url_pattern, text)
        return urls
    
    def mask_sensitive_info(self, text):
        """
        遮蔽敏感信息
        
        参数:
            text (str): 文本内容
            
        返回:
            str: 遮蔽后的文本
        """
        # 遮蔽手机号
        text = re.sub(r'\b(1[3-9]\d)(\d{4})(\d{4})\b', r'\1****\3', text)
        
        # 遮蔽邮箱(保留@前第一个字符和@后域名)
        text = re.sub(r'\b([a-zA-Z0-9])[a-zA-Z0-9._%+-]*@([a-zA-Z0-9.-]+\.[a-zA-Z]{2,})\b', r'\1***@\2', text)
        
        # 遮蔽身份证号
        text = re.sub(r'\b(\d{4})\d{10}(\d{4})\b', r'\1**********\2', text)
        
        return text
    
    def find_keywords(self, text, keywords):
        """
        查找关键词
        
        参数:
            text (str): 文本内容
            keywords (list): 关键词列表
            
        返回:
            dict: 关键词及其出现次数
        """
        result = {}
        for keyword in keywords:
            # 使用单词边界确保精确匹配
            pattern = r'\b' + re.escape(keyword) + r'\b'
            matches = re.findall(pattern, text, re.IGNORECASE)
            result[keyword] = len(matches)
        return result
    
    def replace_text(self, text, pattern, replacement, ignore_case=False):
        """
        替换文本
        
        参数:
            text (str): 原文本
            pattern (str): 要替换的模式
            replacement (str): 替换内容
            ignore_case (bool): 是否忽略大小写
            
        返回:
            str: 替换后的文本
        """
        flags = re.IGNORECASE if ignore_case else 0
        return re.sub(pattern, replacement, text, flags=flags)
    
    def split_text(self, text, delimiter_pattern):
        """
        分割文本
        
        参数:
            text (str): 文本内容
            delimiter_pattern (str): 分隔符模式
            
        返回:
            list: 分割后的文本列表
        """
        return re.split(delimiter_pattern, text)
    
    def validate_input(self, text, pattern_type):
        """
        验证输入格式
        
        参数:
            text (str): 要验证的文本
            pattern_type (str): 验证类型 (email, phone, id_card, ip)
            
        返回:
            bool: 是否符合格式
        """
        patterns = {
            "email": r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$',
            "phone": r'^1[3-9]\d{9}$',
            "id_card": r'^\d{17}[\dXx]$',
            "ip": r'^(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$'
        }
        
        if pattern_type in patterns:
            return bool(re.match(patterns[pattern_type], text))
        else:
            raise ValueError(f"不支持的验证类型: {pattern_type}")

# 使用文本处理器
print("创建文本处理器:")
processor = TextProcessor()

# 测试文本
test_text = """
联系我们:
邮箱:support@example.com 或 admin@test.org
电话:13812345678,客服热线:400-123-4567
网站:https://www.example.com 或 http://test.org:8080/page?id=123
地址:北京市朝阳区某某街道123号
身份证号:110101199001011234
IP地址:192.168.1.1

重要通知:所有用户必须在2023年12月31日前完成信息更新。
关键词包括:用户、信息、更新、完成、必须。
"""

# 提取邮箱
print("提取邮箱:")
emails = processor.extract_emails(test_text)
for email in emails:
    print(f"  {email}")

# 提取电话号码
print("\n提取电话号码:")
phones = processor.extract_phone_numbers(test_text)
for phone in phones:
    print(f"  {phone}")

# 提取URL
print("\n提取URL:")
urls = processor.extract_urls(test_text)
for url in urls:
    print(f"  {url}")

# 遮蔽敏感信息
print("\n遮蔽敏感信息:")
masked_text = processor.mask_sensitive_info(test_text)
print(masked_text)

# 查找关键词
print("\n查找关键词:")
keywords = ["用户", "信息", "更新", "完成", "必须"]
keyword_counts = processor.find_keywords(test_text, keywords)
for keyword, count in keyword_counts.items():
    print(f"  '{keyword}': {count} 次")

# 替换文本
print("\n替换文本:")
replaced_text = processor.replace_text(test_text, r"2023年12月31日", "2024年1月31日")
print("替换日期后:")
print(replaced_text.split('\n')[-3])  # 只显示最后一行

# 分割文本
print("\n分割文本:")
sentences = processor.split_text(test_text, r'[。!?]')
print(f"分割成 {len(sentences)} 句话,前3句:")
for i, sentence in enumerate(sentences[:3]):
    print(f"  {i+1}. {sentence.strip()}")

# 验证输入格式
print("\n验证输入格式:")
test_inputs = [
    ("test@example.com", "email"),
    ("13812345678", "phone"),
    ("110101199001011234", "id_card"),
    ("192.168.1.1", "ip"),
    ("invalid-email", "email"),
    ("12345", "phone")
]

for input_text, input_type in test_inputs:
    is_valid = processor.validate_input(input_text, input_type)
    print(f"  {input_text} ({input_type}): {'有效' if is_valid else '无效'}")

代码说明

案例1代码解释

  1. json.dumps(data, ensure_ascii=False, indent=2):将Python对象序列化为格式化的JSON字符串
  2. json.loads(json_string):将JSON字符串反序列化为Python对象
  3. json.dump(data, f, ensure_ascii=False, indent=2):将数据直接写入文件
  4. json.load(f):从文件直接读取并反序列化

如果JSON字符串格式不正确,json.loads()会抛出JSONDecodeError异常。

案例2代码解释

  1. re.findall(pattern, text):查找所有匹配的文本
  2. re.sub(pattern, replacement, text):替换匹配的文本
  3. re.split(delimiter_pattern, text):按模式分割文本
  4. re.escape(keyword):转义特殊字符以进行字面量匹配

如果正则表达式中有特殊字符没有正确转义,可能会导致匹配结果不符合预期。

5. collections和pathlib模块 - 高级数据结构和路径操作

知识点解析

概念定义:collections模块就像一个"高级工具箱",提供了比内置数据类型更强大的容器类,如计数器、默认字典、命名元组等。pathlib模块则像一个"现代化的路径专家",以面向对象的方式处理文件系统路径,比传统的os.path更加直观和易用。

核心规则

  1. collections模块提供高性能的容器数据类型
  2. pathlib模块提供面向对象的文件系统路径操作
  3. Counter用于统计元素出现次数
  4. Path对象提供丰富的路径操作方法

常见易错点

  1. 忘记collections中的容器类型是不可变的还是可变的
  2. 混用os.path和pathlib导致代码不一致
  3. 不处理路径不存在的情况
  4. 忘记指定文件编码导致中文乱码

实战案例

案例1:数据分析工具

# 数据分析工具
print("===数据分析工具===")

from collections import Counter, defaultdict, namedtuple, deque
from pathlib import Path
import json

class DataAnalyzer:
    """数据分析工具"""
    
    def __init__(self):
        """初始化数据分析工具"""
        print("数据分析工具已启动")
    
    def analyze_text_frequency(self, text):
        """
        分析文本词频
        
        参数:
            text (str): 要分析的文本
            
        返回:
            Counter: 词频统计
        """
        # 简单的分词(按空格和标点符号分割)
        import re
        words = re.findall(r'\b[a-zA-Z]+\b', text.lower())
        
        # 使用Counter统计词频
        word_counter = Counter(words)
        return word_counter
    
    def analyze_character_frequency(self, text):
        """
        分析字符频率
        
        参数:
            text (str): 要分析的文本
            
        返回:
            Counter: 字符频率统计
        """
        # 过滤掉空格和换行符
        characters = [char for char in text if char not in ' \n\t']
        char_counter = Counter(characters)
        return char_counter
    
    def get_most_common_words(self, text, n=10):
        """
        获取最常见的单词
        
        参数:
            text (str): 文本内容
            n (int): 返回前n个最常见的单词
            
        返回:
            list: 最常见的单词及其频率
        """
        word_counter = self.analyze_text_frequency(text)
        return word_counter.most_common(n)
    
    def group_data_by_category(self, data):
        """
        按类别分组数据
        
        参数:
            data (list): 包含字典的列表,每个字典应有'category'键
            
        返回:
            defaultdict: 按类别分组的数据
        """
        # 使用defaultdict自动创建空列表
        grouped_data = defaultdict(list)
        
        for item in data:
            category = item.get('category', 'unknown')
            grouped_data[category].append(item)
        
        return grouped_data
    
    def create_data_structure(self):
        """创建数据结构示例"""
        # 使用namedtuple创建结构化数据
        Person = namedtuple('Person', ['name', 'age', 'city', 'occupation'])
        
        people = [
            Person('张三', 28, '北京', '工程师'),
            Person('李四', 32, '上海', '设计师'),
            Person('王五', 25, '北京', '工程师'),
            Person('赵六', 30, '广州', '产品经理'),
            Person('钱七', 28, '上海', '设计师')
        ]
        
        return people
    
    def analyze_people_data(self, people):
        """
        分析人员数据
        
        参数:
            people (list): namedtuple人员列表
        """
        print("\n=== 人员数据分析 ===")
        
        # 统计职业分布
        occupations = [person.occupation for person in people]
        occupation_counter = Counter(occupations)
        print("职业分布:")
        for occupation, count in occupation_counter.items():
            print(f"  {occupation}: {count}人")
        
        # 统计城市分布
        cities = [person.city for person in people]
        city_counter = Counter(cities)
        print("\n城市分布:")
        for city, count in city_counter.items():
            print(f"  {city}: {count}人")
        
        # 统计年龄分布
        ages = [person.age for person in people]
        age_counter = Counter(ages)
        print("\n年龄分布:")
        for age, count in sorted(age_counter.items()):
            print(f"  {age}岁: {count}人")
    
    def demonstrate_deque_usage(self):
        """演示deque的使用"""
        print("\n=== deque使用演示 ===")
        
        # 创建deque
        dq = deque([1, 2, 3, 4, 5])
        print(f"初始deque: {dq}")
        
        # 两端添加元素
        dq.append(6)        # 右端添加
        dq.appendleft(0)    # 左端添加
        print(f"添加元素后: {dq}")
        
        # 两端删除元素
        popped_right = dq.pop()      # 右端弹出
        popped_left = dq.popleft()   # 左端弹出
        print(f"弹出元素: {popped_right}(右), {popped_left}(左)")
        print(f"弹出后: {dq}")
        
        # 限制deque大小
        limited_dq = deque(maxlen=3)
        for i in range(6):
            limited_dq.append(i)
            print(f"添加{i}后: {limited_dq} (最大长度: {limited_dq.maxlen})")

# 使用数据分析工具
print("创建数据分析工具:")
analyzer = DataAnalyzer()

# 分析文本词频
sample_text = """
Python is a powerful programming language. Python is easy to learn and Python is widely used.
Many developers love Python because Python is versatile and Python has a great community.
Data science, web development, automation, and artificial intelligence all use Python.
Python's syntax is clean and readable, making Python a great choice for beginners.
"""

print("分析文本词频:")
word_freq = analyzer.get_most_common_words(sample_text, 5)
for word, count in word_freq:
    print(f"  {word}: {count}次")

# 分析字符频率
print("\n分析字符频率:")
char_freq = analyzer.analyze_character_frequency("hello world")
for char, count in char_freq.most_common(5):
    print(f"  '{char}': {count}次")

# 按类别分组数据
sample_data = [
    {'name': '项目A', 'category': '技术', 'priority': '高'},
    {'name': '项目B', 'category': '市场', 'priority': '中'},
    {'name': '项目C', 'category': '技术', 'priority': '低'},
    {'name': '项目D', 'category': '运营', 'priority': '高'},
    {'name': '项目E', 'category': '技术', 'priority': '中'}
]

print("\n按类别分组数据:")
grouped = analyzer.group_data_by_category(sample_data)
for category, items in grouped.items():
    print(f"  {category}类别 ({len(items)}项):")
    for item in items:
        print(f"    - {item['name']} (优先级: {item['priority']})")

# 创建和分析人员数据
people = analyzer.create_data_structure()
analyzer.analyze_people_data(people)

# 演示deque使用
analyzer.demonstrate_deque_usage()

案例2:文件系统分析器

# 文件系统分析器
print("\n===文件系统分析器===")

from collections import Counter, defaultdict
from pathlib import Path
import os

class FileSystemAnalyzer:
    """文件系统分析器"""
    
    def __init__(self):
        """初始化文件系统分析器"""
        print("文件系统分析器已启动")
    
    def analyze_directory(self, directory_path):
        """
        分析目录结构
        
        参数:
            directory_path (str): 目录路径
            
        返回:
            dict: 分析结果
        """
        directory = Path(directory_path)
        if not directory.exists():
            print(f"目录不存在: {directory_path}")
            return None
        
        if not directory.is_dir():
            print(f"路径不是目录: {directory_path}")
            return None
        
        print(f"开始分析目录: {directory_path}")
        
        # 统计信息
        file_count = 0
        dir_count = 0
        extension_counter = Counter()
        size_counter = Counter()
        size_distribution = defaultdict(int)
        
        try:
            # 递归遍历目录
            for item in directory.rglob('*'):
                if item.is_file():
                    file_count += 1
                    
                    # 统计文件扩展名
                    extension = item.suffix.lower() if item.suffix else '[无扩展名]'
                    extension_counter[extension] += 1
                    
                    # 统计文件大小
                    try:
                        size = item.stat().st_size
                        # 按大小分类
                        if size < 1024:  # < 1KB
                            size_category = '< 1KB'
                        elif size < 1024 * 1024:  # < 1MB
                            size_category = '1KB - 1MB'
                        elif size < 10 * 1024 * 1024:  # < 10MB
                            size_category = '1MB - 10MB'
                        elif size < 100 * 1024 * 1024:  # < 100MB
                            size_category = '10MB - 100MB'
                        else:  # >= 100MB
                            size_category = '>= 100MB'
                        
                        size_distribution[size_category] += 1
                        
                    except OSError:
                        pass  # 忽略无法访问的文件
                        
                elif item.is_dir():
                    dir_count += 1
            
            # 构建分析结果
            analysis = {
                'total_files': file_count,
                'total_directories': dir_count,
                'extensions': dict(extension_counter.most_common()),
                'size_distribution': dict(size_distribution),
                'largest_files': self._get_largest_files(directory, 5)
            }
            
            return analysis
            
        except Exception as e:
            print(f"分析目录时出错: {e}")
            return None
    
    def _get_largest_files(self, directory, count):
        """
        获取最大的文件
        
        参数:
            directory (Path): 目录路径
            count (int): 返回文件数量
            
        返回:
            list: 最大文件列表
        """
        files_with_size = []
        
        for item in directory.rglob('*'):
            if item.is_file():
                try:
                    size = item.stat().st_size
                    files_with_size.append((str(item), size))
                except OSError:
                    pass
        
        # 按大小排序并返回前count个
        files_with_size.sort(key=lambda x: x[1], reverse=True)
        return files_with_size[:count]
    
    def show_analysis_report(self, analysis):
        """
        显示分析报告
        
        参数:
            analysis (dict): 分析结果
        """
        if not analysis:
            print("没有分析数据")
            return
        
        print("\n=== 文件系统分析报告 ===")
        print(f"总文件数: {analysis['total_files']}")
        print(f"总目录数: {analysis['total_directories']}")
        
        # 显示文件扩展名统计
        if analysis['extensions']:
            print("\n文件类型统计 (前10种):")
            for extension, count in list(analysis['extensions'].items())[:10]:
                print(f"  {extension}: {count}个")
        
        # 显示文件大小分布
        if analysis['size_distribution']:
            print("\n文件大小分布:")
            for size_range, count in analysis['size_distribution'].items():
                print(f"  {size_range}: {count}个")
        
        # 显示最大的文件
        if analysis['largest_files']:
            print("\n最大的5个文件:")
            for file_path, size in analysis['largest_files']:
                # 格式化文件大小
                if size < 1024:
                    size_str = f"{size}B"
                elif size < 1024 * 1024:
                    size_str = f"{size/1024:.1f}KB"
                elif size < 1024 * 1024 * 1024:
                    size_str = f"{size/(1024*1024):.1f}MB"
                else:
                    size_str = f"{size/(1024*1024*1024):.1f}GB"
                
                print(f"  {size_str}: {file_path}")
    
    def create_sample_structure(self):
        """创建示例目录结构"""
        print("\n=== 创建示例目录结构 ===")
        
        # 创建测试目录结构
        structure = {
            "test_project": {
                "src": {
                    "main.py": "print('Hello, World!')",
                    "utils.py": "def helper():\n    return 'Helper function'",
                    "config.py": "CONFIG = {'debug': True}"
                },
                "docs": {
                    "README.md": "# Test Project\nThis is a test project.",
                    "guide.txt": "User guide content",
                    "api.md": "# API Documentation\nAPI details here."
                },
                "data": {
                    "users.json": '[{"name": "张三", "age": 25}]',
                    "config.xml": '<config><debug>true</debug></config>',
                    "large_file.txt": "Large file content. " * 1000  # 创建大文件
                },
                "tests": {
                    "test_main.py": "def test_main():\n    assert True",
                    "test_utils.py": "def test_utils():\n    assert True"
                }
            }
        }
        
        self._create_structure(structure)
        print("示例目录结构创建完成")
    
    def _create_structure(self, structure, base_path="."):
        """递归创建目录结构"""
        for name, content in structure.items():
            path = Path(base_path) / name
            if isinstance(content, dict):
                # 创建目录
                path.mkdir(exist_ok=True)
                # 递归创建子结构
                self._create_structure(content, path)
            else:
                # 创建文件
                with open(path, "w", encoding="utf-8") as f:
                    f.write(content)
    
    def cleanup_sample_structure(self):
        """清理示例目录结构"""
        print("\n=== 清理示例目录结构 ===")
        import shutil
        try:
            if Path("test_project").exists():
                shutil.rmtree("test_project")
                print("示例目录结构已清理")
            else:
                print("示例目录不存在")
        except Exception as e:
            print(f"清理目录时出错: {e}")

# 使用文件系统分析器
print("创建文件系统分析器:")
fs_analyzer = FileSystemAnalyzer()

# 创建示例结构
fs_analyzer.create_sample_structure()

# 分析目录
analysis = fs_analyzer.analyze_directory("test_project")

# 显示分析报告
fs_analyzer.show_analysis_report(analysis)

# 清理示例结构
fs_analyzer.cleanup_sample_structure()

代码说明

案例1代码解释

  1. Counter(words):统计单词出现次数
  2. defaultdict(list):自动创建空列表的字典
  3. namedtuple('Person', ['name', 'age', 'city', 'occupation']):创建命名元组
  4. deque([1, 2, 3], maxlen=3):创建有限长度的双端队列

如果向namedtuple实例尝试赋值,会抛出AttributeError异常,因为namedtuple是不可变的。

案例2代码解释

  1. Path(directory_path):创建Path对象
  2. directory.rglob('*'):递归遍历目录中的所有项目
  3. item.stat().st_size:获取文件大小
  4. item.suffix.lower():获取文件扩展名并转换为小写

如果目录中包含权限受限的文件,item.stat()可能会抛出OSError异常。

这些实战案例展示了Python标准库中常用模块的实际应用场景,包括系统操作、时间处理、数学计算、数据序列化、文本处理和高级数据结构等。通过这些例子,可以更好地理解如何在实际项目中使用这些模块来解决具体问题。


6. functools 模块 - 函数工具箱

知识点解析

概念定义:functools 模块提供了一系列"高阶函数工具",用于操作或增强其他函数。它就像给函数加装备的"工匠"——能缓存结果、固定参数、保留元信息等。

核心规则

  1. @lru_cache — 缓存函数返回值,避免重复计算(Least Recently Used 策略)
  2. @cached_property — 将结果缓存为实例属性(Python 3.8+)
  3. functools.partial — 固定函数的部分参数,生成新函数
  4. functools.reduce — 将函数累积应用到序列上
  5. @functools.wraps — 保留被装饰函数的元信息(__name____doc__
  6. functools.singledispatch — 单分派泛型函数(根据第一个参数类型选择实现)

常见易错点

  1. lru_cache 要求函数参数必须是可哈希的(list/dict 不能直接缓存)
  2. 忘记加 @wraps 导致被装饰函数的 __name__ 变成 wrapper
  3. partial 固定的是位置参数,要注意参数顺序
  4. cached_property 只在第一次访问时计算,之后从实例 __dict__ 取值
  5. reduce 在 Python 3 中已移到 functools,不再内置

实战案例

案例:缓存、偏函数与装饰器工具

import functools
import time

# === lru_cache 缓存 ===
@functools.lru_cache(maxsize=128)
def fibonacci(n):
    """递归计算斐波那契数(带缓存,避免重复计算)"""
    if n < 2:
        return n
    return fibonacci(n - 1) + fibonacci(n - 2)

# 不带缓存的递归:fib(35) 需要约 5 秒
# 带缓存的递归:fib(35) 瞬间完成
print(f"fibonacci(35) = {fibonacci(35)}")
print(f"缓存信息: {fibonacci.cache_info()}")

# === partial 偏函数 ===
def log(level, message, timestamp=None):
    ts = timestamp or time.strftime("%H:%M:%S")
    print(f"[{ts}] [{level}] {message}")

# 固定 level 参数,生成专用函数
info_log = functools.partial(log, "INFO")
error_log = functools.partial(log, "ERROR")
debug_log = functools.partial(log, "DEBUG")

info_log("系统启动")      # 输出: [14:30:00] [INFO] 系统启动
error_log("连接超时")     # 输出: [14:30:00] [ERROR] 连接超时

# === wraps 保留元信息 ===
def timer(func):
    """计时装饰器(使用 wraps 保留原函数信息)"""
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        start = time.perf_counter()
        result = func(*args, **kwargs)
        elapsed = time.perf_counter() - start
        print(f"{func.__name__} 执行耗时: {elapsed:.4f}s")
        return result
    return wrapper

@timer
def slow_function():
    """这是一个慢函数的文档字符串"""
    time.sleep(0.1)
    return "done"

print(f"函数名: {slow_function.__name__}")    # 输出: slow_function(不是 wrapper)
print(f"文档: {slow_function.__doc__}")        # 输出: 这是一个慢函数的文档字符串

# === singledispatch 泛型函数 ===
@functools.singledispatch
def process_data(data):
    print(f"默认处理: {data}")

@process_data.register(int)
def _(data):
    print(f"处理整数: {data} × 2 = {data * 2}")

@process_data.register(list)
def _(data):
    print(f"处理列表: 长度={len(data)}, 内容={data}")

@process_data.register(str)
def _(data):
    print(f"处理字符串: '{data}' (长度={len(data)})")

process_data(42)             # 处理整数
process_data([1, 2, 3])     # 处理列表
process_data("hello")        # 处理字符串
process_data(3.14)           # 默认处理

7. itertools 模块 - 迭代器工具箱

知识点解析

概念定义:itertools 模块提供了一套"迭代器工具",用于高效创建和操作迭代器。它就像一个"流水线工厂"——能组合、过滤、排列、累积各种数据流,而且全部是惰性求值,内存占用极低。

核心规则

  1. 无限迭代器:count(start, step)cycle(iterable)repeat(elem, n)
  2. 排列组合:product(A, B) 笛卡尔积、permutations(A, r) 排列、combinations(A, r) 组合
  3. 分组与切片:groupby(iterable, key)(需先排序)、islice(iterable, start, stop)
  4. 筛选与累积:filterfalse(pred, seq)accumulate(seq, func)chain(*iterables)
  5. itertools.starmap(func, iterable) — 类似 map 但解包元组参数

常见易错点

  1. groupby 只会合并相邻的相同元素,使用前必须先排序
  2. 无限迭代器(count/cycle)不能直接 list() 转换,会内存溢出
  3. product 的结果是笛卡尔积,数据量大时注意组合爆炸
  4. combinations 无重复组合,combinations_with_replacement 允许重复
  5. islice 不支持负索引

实战案例

案例:数据分析中的迭代器技巧

import itertools

# === chain:合并多个可迭代对象 ===
headers = ["姓名", "年龄", "城市"]
row1 = ["张三", 28, "北京"]
row2 = ["李四", 32, "上海"]
all_data = list(itertools.chain(headers, row1, row2))
print(f"合并数据: {all_data}")

# === product:笛卡尔积(所有组合)===
sizes = ["S", "M", "L"]
colors = ["红", "蓝"]
variants = list(itertools.product(colors, sizes))
print(f"SKU组合: {variants}")
print(f"共 {len(variants)} 种")

# === permutations vs combinations ===
team = ["A", "B", "C"]
print(f"全排列(permutations): {list(itertools.permutations(team, 2))}")
print(f"组合(combinations):   {list(itertools.combinations(team, 2))}")

# === groupby:分组(必须先排序!)===
data = [("水果", "苹果"), ("蔬菜", "白菜"), ("水果", "香蕉"),
        ("肉类", "牛肉"), ("蔬菜", "萝卜"), ("水果", "橙子")]
data.sort(key=lambda x: x[0])  # 必须先排序!
for category, items in itertools.groupby(data, key=lambda x: x[0]):
    print(f"{category}: {[item[1] for item in items]}")

# === accumulate:累积计算 ===
sales = [100, 200, 150, 300, 250]
cumsum = list(itertools.accumulate(sales))
print(f"销售额: {sales}")
print(f"累计额: {cumsum}")
# 自定义累积(累乘)
cumprod = list(itertools.accumulate(sales[1:], lambda x, y: x * y))
print(f"累乘: {cumprod}")

# === islice:惰性切片 ===
# 模拟大文件逐行读取,只取前3行
def fake_lines():
    for i in range(1, 1000000):
        yield f"第{i}行数据"

first_3 = list(itertools.islice(fake_lines(), 3))
print(f"前3行: {first_3}")

8. subprocess 模块 - 子进程管理

知识点解析

概念定义:subprocess 模块让你能在 Python 中启动新的进程、执行系统命令,就像在终端里敲命令一样。它是 os.system() 的现代替代品,功能更强大、更安全。

核心规则

  1. subprocess.run() — Python 3.5+ 推荐用法,运行命令并等待完成
  2. subprocess.Popen() — 高级用法,可以与子进程实时交互
  3. capture_output=True — 捕获 stdout 和 stderr(Python 3.7+)
  4. text=True — 将输出作为字符串返回(而非 bytes)
  5. timeout 参数 — 防止子进程卡死

常见易错点

  1. shell=True 存在命令注入风险,尽量避免使用
  2. Windows 下命令是 dir,Linux 下是 ls,注意跨平台差异
  3. run() 默认不会抛出异常(即使命令返回非零退出码),需要 check=True
  4. Popen 创建后需要手动 communicate()wait(),否则可能产生僵尸进程
  5. 管道输出过大时可能导致死锁(communicate() 会自动处理)

实战案例

案例:安全的命令执行工具

import subprocess
import sys

# === run:基本用法 ===
# 获取 Python 版本
result = subprocess.run(
    [sys.executable, "--version"],
    capture_output=True,
    text=True,
)
print(f"Python版本: {result.stdout.strip()}")

# check=True:命令失败时抛出 CalledProcessError
try:
    subprocess.run(
        [sys.executable, "-c", "print('Hello from subprocess!')"],
        capture_output=True,
        text=True,
        check=True,
    )
except subprocess.CalledProcessError as e:
    print(f"命令失败: {e}")

# === timeout:超时控制 ===
try:
    subprocess.run(
        [sys.executable, "-c", "import time; time.sleep(10)"],
        timeout=2,  # 最多等2秒
        capture_output=True,
        text=True,
    )
except subprocess.TimeoutExpired:
    print("命令执行超时!")

# === Popen:实时交互 ===
# 用管道将数据传给子进程
process = subprocess.Popen(
    [sys.executable, "-c", "import sys; print(sys.stdin.read().upper())"],
    stdin=subprocess.PIPE,
    stdout=subprocess.PIPE,
    stderr=subprocess.PIPE,
    text=True,
)
stdout, stderr = process.communicate(input="hello world\n")
print(f"子进程输出: {stdout.strip()}")

9. pathlib 进阶 - 现代 path 操作

知识点解析

概念定义:pathlib 是 Python 3.4 引入的面向对象路径库,用 Path 对象替代 os.path 的字符串拼接。Path 对象可以像操作字符串一样操作路径,同时提供文件读写、目录遍历等方法。

核心规则

  1. Path.glob("*.py") — 非递归匹配、Path.rglob("*.py") — 递归匹配
  2. Path.mkdir(parents=True, exist_ok=True) — 安全创建目录
  3. Path.read_text() / Path.write_text() — 一行读写文件
  4. Path.stat() — 获取文件信息(大小、修改时间等)
  5. Path.resolve() — 获取绝对路径并解析符号链接

pathlib vs os.path 对比分析

操作 os.path(旧式) pathlib(新式)
拼接路径 os.path.join(a, b, c) Path(a) / b / c
获取扩展名 os.path.splitext(f)[1] Path(f).suffix
获取文件名 os.path.basename(f) Path(f).name
获取父目录 os.path.dirname(f) Path(f).parent
判断是否存在 os.path.exists(f) Path(f).exists()
列出目录 os.listdir(d) Path(d).iterdir()
遍历匹配 glob.glob("*.py") Path(".").glob("*.py")

常见易错点

  1. / 运算符要求左边是 Path 对象,不能是字符串:"a" / "b" 会报错
  2. globrglob 返回生成器,不是列表,需要 list() 才能反复遍历
  3. Windows 下路径用 \,但 Path 对象的 / 运算符自动处理分隔符

实战案例

案例:项目文件扫描器

from pathlib import Path

# 扫描当前项目结构
project = Path(".")
print("=== 项目 Python 文件 ===")
py_files = sorted(project.rglob("*.py"))
for f in py_files[:10]:  # 只显示前10个
    size = f.stat().st_size
    print(f"  {f.relative_to(project)} ({size} 字节)")
print(f"  ... 共 {len(py_files)} 个 Python 文件")

# 查找所有 Markdown 文件
md_files = list(project.glob("**/*.md"))
print(f"\nMarkdown 文件: {len(md_files)} 个")

# 按扩展名统计
from collections import Counter
extensions = Counter(f.suffix for f in project.rglob("*") if f.is_file())
print(f"\n文件类型分布: {dict(extensions.most_common(5))}")

10. contextlib 模块 - 上下文管理器工具

知识点解析

概念定义:contextlib 模块提供了一组"上下文管理器工具",让你更方便地创建和使用 with 语句。上下文管理器确保资源(文件、锁、连接等)在使用后自动释放,即使发生异常也不会泄漏。

核心规则

  1. @contextmanager — 用生成器函数快速创建上下文管理器
  2. contextlib.suppress(Exception) — 静默抑制指定异常
  3. contextlib.ExitStack — 动态管理多个上下文(数量不确定时特别有用)
  4. contextlib.redirect_stdout / redirect_stderr — 临时重定向输出流

常见易错点

  1. @contextmanager 装饰的函数必须 yield 恰好一次
  2. yield 之后的代码(清理逻辑)即使在 with 块中发生异常也会执行
  3. suppress 会静默吞掉异常,只适合"不存在也不影响"的场景
  4. ExitStack__exit__ 时按 LIFO 顺序(后进先出)清理资源

实战案例

案例:自定义上下文管理器

import contextlib
import time

# === @contextmanager:计时器 ===
@contextlib.contextmanager
def timer(name):
    """计时上下文管理器"""
    start = time.perf_counter()
    print(f"[{name}] 开始...")
    try:
        yield
    finally:
        elapsed = time.perf_counter() - start
        print(f"[{name}] 完成,耗时: {elapsed:.4f}s")

with timer("数据处理"):
    time.sleep(0.2)
    # 模拟数据处理...
    total = sum(range(1000000))

# === suppress:抑制异常 ===
with contextlib.suppress(FileNotFoundError):
    # 文件不存在也不报错
    open("nonexistent_file.txt", "r")

# === ExitStack:动态管理多个资源 ===
@contextlib.contextmanager
def acquire_resource(name):
    print(f"  获取资源: {name}")
    yield name
    print(f"  释放资源: {name}")

with contextlib.ExitStack() as stack:
    # 动态获取不确定数量的资源
    resources = ["数据库连接", "文件锁", "网络socket"]
    for r in resources:
        stack.enter_context(acquire_resource(r))
    print("所有资源已获取,执行业务逻辑...")
    # with 块结束时自动按 LIFO 顺序释放所有资源

# === redirect_stdout:捕获 print 输出 ===
import io

f = io.StringIO()
with contextlib.redirect_stdout(f):
    print("这行输出被重定向了")
    print("这行也是")

captured = f.getvalue()
print(f"捕获到的输出: {captured.strip()}")

标准库补充模块小结

模块 核心功能 最常用工具
functools 函数工具 lru_cache, partial, wraps, singledispatch
itertools 迭代器工具 chain, product, combinations, groupby, islice
subprocess 子进程管理 run(capture_output=True, text=True, check=True)
pathlib 路径操作 Path.glob(), read_text(), write_text(), stat()
contextlib 上下文管理 @contextmanager, suppress, ExitStack

学习建议

  1. 优先掌握 functools.lru_cacheitertools——日常编程中最高频
  2. subprocess.run() 是执行外部命令的标准方式,记住 capture_output=True, text=True 组合
  3. 新项目优先使用 pathlib 替代 os.path,代码更简洁
  4. contextlib.suppress 适合"尝试操作,失败也无所谓"的场景