Skip to content

🎯 高级魔法师实战:生成器、迭代器和协程练习

🎉 欢迎来到生成器、迭代器和协程的练习环节!通过这些精心设计的练习,你将巩固对Python中处理数据流和异步操作的理解,并提升实际应用能力。让我们开始吧!

📋 练习目标

  1. 掌握迭代器的概念和自定义迭代器的实现
  2. 熟练使用生成器函数和生成器表达式
  3. 理解并应用生成器的高级特性(send、throw、close方法)
  4. 掌握现代协程(async/await)的使用方法
  5. 能够在实际项目中应用这些特性提高代码性能和可读性

🚀 基础练习

练习1:自定义迭代器

题目要求:

  • 实现一个名为EvenNumbers的迭代器类,用于生成指定范围内的所有偶数
  • 迭代器类需要实现__iter__()__next__()方法
  • 当迭代结束时,应抛出StopIteration异常

示例代码:

python
class EvenNumbers:
    """生成指定范围内的所有偶数的迭代器"""
    def __init__(self, start, end):
        # 初始化起始值和结束值
        # 如果start是奇数,则从下一个偶数开始
        # 你的代码这里

    def __iter__(self):
        # 返回自身作为迭代器
        # 你的代码这里

    def __next__(self):
        # 返回下一个偶数
        # 如果超过结束值,抛出StopIteration异常
        # 你的代码这里

# 测试代码
evens = EvenNumbers(1, 10)
print("1到10之间的偶数:")
for num in evens:
    print(num, end=' ')
# 预期输出: 2 4 6 8 10

# 测试迭代器只能使用一次
print("\n再次遍历同一个迭代器:")
for num in evens:
    print(num, end=' ')
# 预期输出: 什么都不输出

参考实现:

python
class EvenNumbers:
    """生成指定范围内的所有偶数的迭代器"""
    def __init__(self, start, end):
        """初始化迭代器,设置起始值和结束值"""
        # 如果start是奇数,则从下一个偶数开始
        self.current = start if start % 2 == 0 else start + 1
        self.end = end
    
    def __iter__(self):
        """返回自身作为迭代器"""
        return self
    
    def __next__(self):
        """返回下一个偶数"""
        if self.current > self.end:
            raise StopIteration
        result = self.current
        self.current += 2
        return result

练习2:基本生成器函数

题目要求:

  • 实现一个名为count_up的生成器函数,用于生成从指定起始值开始的无限递增序列
  • 生成器函数应该接受一个可选参数step,用于指定递增的步长,默认为1
  • 生成器函数应该接受一个可选参数max_value,用于指定最大值,默认为None(无限序列)

示例代码:

python
def count_up(start, step=1, max_value=None):
    """生成从start开始的无限递增序列"""
    # 你的代码这里

# 测试代码
print("\n从5开始,步长为2的序列(取前5个值):")
counter = count_up(5, 2)
for _ in range(5):
    print(next(counter), end=' ')
# 预期输出: 5 7 9 11 13

print("\n\n从10开始,步长为3,最大值为20的序列:")
for num in count_up(10, 3, 20):
    print(num, end=' ')
# 预期输出: 10 13 16 19

参考实现:

python
def count_up(start, step=1, max_value=None):
    """生成从start开始的无限递增序列"""
    current = start
    while True:
        # 检查是否超过最大值
        if max_value is not None and current > max_value:
            break
        yield current
        current += step

练习3:生成器表达式

题目要求:

  • 使用生成器表达式重写以下列表推导式
  • 比较两者的内存使用情况
  • 解释为什么在处理大量数据时生成器表达式更有优势

示例代码:

python
# 原列表推导式
squares_list = [x*x for x in range(1000000)]

# 使用生成器表达式重写
# 你的代码这里

# 比较内存使用情况
import sys
print(f"列表推导式内存使用: {sys.getsizeof(squares_list)} 字节")
print(f"生成器表达式内存使用: {sys.getsizeof(squares_generator)} 字节")

# 解释为什么生成器表达式更适合处理大量数据
# 你的解释这里

参考实现:

python
# 原列表推导式
squares_list = [x*x for x in range(1000000)]

# 使用生成器表达式重写
squares_generator = (x*x for x in range(1000000))

# 比较内存使用情况
import sys
print(f"列表推导式内存使用: {sys.getsizeof(squares_list)} 字节")
print(f"生成器表达式内存使用: {sys.getsizeof(squares_generator)} 字节")

# 解释
print("\n为什么生成器表达式更适合处理大量数据:")
print("1. 列表推导式会一次性创建完整的列表并加载到内存中")
print("2. 生成器表达式只在需要时才计算下一个值,内存占用小")
print("3. 对于大数据集,生成器表达式可以避免内存溢出问题")
print("4. 生成器表达式更加高效,特别是当你只需要遍历数据一次时")

练习4:高级生成器方法(send、throw、close)

题目要求:

  • 实现一个名为interactive_counter的生成器函数,它可以接收外部发送的值来控制计数
  • 生成器应该支持以下操作:
    1. 初始状态下,生成器从0开始计数
    2. 当接收到字符串'reset'时,计数重置为0
    3. 当接收到整数n时,计数增加n
    4. 当接收到字符串'stop'时,生成器停止并抛出StopIteration异常
  • 使用send()方法向生成器发送命令
  • 使用close()方法手动关闭生成器

示例代码:

python
def interactive_counter():
    """一个可以通过send()方法控制的计数器生成器"""
    count = 0
    command = yield count  # 第一次yield返回初始计数,并等待命令
    
    while True:
        # 根据接收到的命令执行不同操作
        # 你的代码这里

# 测试代码
print("\n测试交互式计数器:")
counter = interactive_counter()

# 启动生成器(获取初始值)
print(next(counter))  # 预期输出: 0

# 发送各种命令
try:
    print(counter.send(5))  # 增加5,预期输出: 5
    print(counter.send(3))  # 增加3,预期输出: 8
    print(counter.send('reset'))  # 重置,预期输出: 0
    print(counter.send(10))  # 增加10,预期输出: 10
    print(counter.send('stop'))  # 停止,应该抛出StopIteration异常
except StopIteration:
    print("生成器已停止")

# 测试手动关闭生成器
print("\n测试手动关闭生成器:")
counter2 = interactive_counter()
print(next(counter2))  # 预期输出: 0
counter2.close()

# 尝试在关闭后发送命令
try:
    counter2.send(5)
except StopIteration:
    print("生成器已关闭")

参考实现:

python
def interactive_counter():
    """一个可以通过send()方法控制的计数器生成器"""
    count = 0
    command = yield count  # 第一次yield返回初始计数,并等待命令
    
    while True:
        if command == 'reset':
            count = 0
        elif command == 'stop':
            # 清理操作(如果有的话)
            break
        elif isinstance(command, int):
            count += command
        else:
            # 忽略未知命令
            pass
        
        # 再次yield当前计数,并等待下一个命令
        command = yield count

练习5:使用yield from组合生成器

题目要求:

  • 实现三个生成器函数:letters_generator(生成A-Z的字母)、numbers_generator(生成1-10的数字)和symbols_generator(生成常见标点符号)
  • 实现一个名为combined_generator的生成器函数,使用yield from语句组合上述三个生成器
  • 在组合生成器中,添加适当的分隔信息,以表明正在生成的是哪种类型的数据

示例代码:

python
def letters_generator():
    """生成A-Z的字母"""
    # 你的代码这里

def numbers_generator():
    """生成1-10的数字"""
    # 你的代码这里

def symbols_generator():
    """生成常见标点符号"""
    # 你的代码这里

def combined_generator():
    """组合三个生成器"""
    # 你的代码这里

# 测试代码
print("\n测试组合生成器:")
for item in combined_generator():
    print(item, end=' ')
# 预期输出类似于:
# [字母] A B C ... Z
# [数字] 1 2 3 ... 10
# [符号] ! @ # $ %

参考实现:

python
def letters_generator():
    """生成A-Z的字母"""
    for letter in 'ABCDEFGHIJKLMNOPQRSTUVWXYZ':
        yield letter

def numbers_generator():
    """生成1-10的数字"""
    for number in range(1, 11):
        yield number

def symbols_generator():
    """生成常见标点符号"""
    for symbol in '!@#$%^&*()':
        yield symbol

def combined_generator():
    """组合三个生成器"""
    yield "[字母]"
    yield from letters_generator()
    yield "\n[数字]"
    yield from numbers_generator()
    yield "\n[符号]"
    yield from symbols_generator()

🎯 挑战练习

练习6:实现一个质数生成器

题目要求:

  • 实现一个名为prime_generator的生成器函数,用于生成无限个质数
  • 使用埃拉托斯特尼筛法或其他高效算法来判断一个数是否为质数
  • 添加一个可选参数max_count,用于限制生成的质数数量

示例代码:

python
def prime_generator(max_count=None):
    """生成无限个质数的生成器"""
    # 你的代码这里

# 测试代码
print("\n生成前10个质数:")
primes = prime_generator(10)
for prime in primes:
    print(prime, end=' ')
# 预期输出: 2 3 5 7 11 13 17 19 23 29

参考实现:

python
def prime_generator(max_count=None):
    """生成无限个质数的生成器"""
    count = 0
    # 从2开始检查(第一个质数)
    num = 2
    
    while max_count is None or count < max_count:
        is_prime = True
        # 只需检查到num的平方根即可
        for i in range(2, int(num**0.5) + 1):
            if num % i == 0:
                is_prime = False
                break
        
        if is_prime:
            yield num
            count += 1
        
        num += 1

练习7:实现一个可重置的斐波那契生成器

题目要求:

  • 实现一个名为resettable_fibonacci的生成器函数,用于生成斐波那契数列
  • 生成器应该支持以下操作:
    1. 初始状态下,生成器从0, 1开始生成斐波那契数列
    2. 当接收到字符串'reset'时,生成器重置为初始状态
    3. 当接收到一个元组(a, b)时,生成器从指定的两个数开始生成斐波那契数列
  • 使用send()方法向生成器发送命令

示例代码:

python
def resettable_fibonacci():
    """一个可重置的斐波那契数列生成器"""
    # 你的代码这里

# 测试代码
print("\n测试可重置的斐波那契生成器:")
fib = resettable_fibonacci()

# 获取前5个斐波那契数
print("前5个斐波那契数:")
for _ in range(5):
    print(next(fib), end=' ')
# 预期输出: 0 1 1 2 3

# 重置生成器并获取前3个斐波那契数
print("\n\n重置后获取前3个斐波那契数:")
print(fib.send('reset'))  # 重置并获取第一个数
print(next(fib))  # 获取第二个数
print(next(fib))  # 获取第三个数
# 预期输出: 0 1 1

# 从指定的两个数开始生成斐波那契数列
print("\n\n从(10, 20)开始生成前4个斐波那契数:")
print(fib.send((10, 20)))  # 设置起始值并获取第一个数
print(next(fib))  # 获取第二个数
print(next(fib))  # 获取第三个数
print(next(fib))  # 获取第四个数
# 预期输出: 10 20 30 50 80

参考实现:

python
def resettable_fibonacci():
    """一个可重置的斐波那契数列生成器"""
    a, b = 0, 1
    command = yield a  # 第一次yield返回0,并等待命令
    
    while True:
        if command == 'reset':
            a, b = 0, 1
            command = yield a  # 重置后返回0,并等待命令
        elif isinstance(command, tuple) and len(command) == 2:
            a, b = command
            command = yield a  # 设置新的起始值后返回a,并等待命令
        else:
            a, b = b, a + b
            command = yield a  # 返回下一个斐波那契数,并等待命令

练习8:实现异步计数器(现代协程)

题目要求:

  • 实现一个名为async_counter的协程函数,它可以按照指定的时间间隔递增计数
  • 协程应该支持以下参数:
    1. interval:计数递增的时间间隔(秒)
    2. max_count:最大计数值,达到后协程结束
    3. callback:一个可选的回调函数,每次计数递增时调用
  • 使用async/await语法和asyncio.sleep()函数实现
  • 编写一个主协程main()来测试异步计数器

示例代码:

python
import asyncio

async def async_counter(interval, max_count, callback=None):
    """异步计数器"""
    # 你的代码这里

async def main():
    """测试异步计数器的主协程"""
    # 定义一个简单的回调函数
    def on_count(count):
        print(f"计数: {count}")
    
    # 运行异步计数器(每秒计数一次,最多计数5次)
    # 你的代码这里

# 运行主协程
print("\n测试异步计数器:")
asyncio.run(main())
# 预期输出(每秒输出一次):
# 计数: 1
# 计数: 2
# 计数: 3
# 计数: 4
# 计数: 5
# 计数完成

参考实现:

python
import asyncio

async def async_counter(interval, max_count, callback=None):
    """异步计数器"""
    count = 0
    while count < max_count:
        await asyncio.sleep(interval)
        count += 1
        if callback:
            callback(count)
    return count

async def main():
    """测试异步计数器的主协程"""
    # 定义一个简单的回调函数
    def on_count(count):
        print(f"计数: {count}")
    
    # 运行异步计数器(每秒计数一次,最多计数5次)
    total = await async_counter(1, 5, on_count)
    print(f"计数完成,总计数值: {total}")

# 运行主协程
# 注意:实际运行需要取消下面的注释
# print("\n测试异步计数器:")
# asyncio.run(main())

练习9:实现异步数据处理管道

题目要求:

  • 实现一个异步数据处理管道,包括三个协程函数:
    1. read_file_async(file_path):异步读取文件内容
    2. process_data_async(data):异步处理数据(例如,计算单词频率)
    3. write_results_async(results, output_file):异步写入处理结果
  • 使用aiofiles库进行异步文件操作(需要安装:pip install aiofiles
  • 编写一个主协程main()来测试整个数据处理管道

示例代码:

python
import asyncio
import aiofiles
import collections
import re

async def read_file_async(file_path):
    """异步读取文件内容"""
    # 你的代码这里

async def process_data_async(data):
    """异步处理数据(计算单词频率)"""
    # 你的代码这里

async def write_results_async(results, output_file):
    """异步写入处理结果"""
    # 你的代码这里

async def main():
    """测试异步数据处理管道的主协程"""
    # 测试文件路径(确保文件存在)
    input_file = "sample_text.txt"
    output_file = "word_frequencies.txt"
    
    try:
        # 创建一个简单的测试文件
        # 你的代码这里
        
        # 运行数据处理管道
        # 你的代码这里
        
        print(f"数据处理完成,结果已保存到 {output_file}")
    except Exception as e:
        print(f"处理过程中发生错误: {e}")

# 运行主协程
print("\n测试异步数据处理管道:")
asyncio.run(main())

参考实现:

python
import asyncio
import aiofiles
import collections
import re

async def read_file_async(file_path):
    """异步读取文件内容"""
    async with aiofiles.open(file_path, 'r', encoding='utf-8') as file:
        return await file.read()

async def process_data_async(data):
    """异步处理数据(计算单词频率)"""
    # 使用正则表达式提取所有单词(转小写)
    words = re.findall(r'\b\w+\b', data.lower())
    # 计算单词频率
    word_counts = collections.Counter(words)
    # 按频率降序排序
    sorted_words = sorted(word_counts.items(), key=lambda x: x[1], reverse=True)
    return sorted_words

async def write_results_async(results, output_file):
    """异步写入处理结果"""
    async with aiofiles.open(output_file, 'w', encoding='utf-8') as file:
        await file.write("单词频率统计结果:\n")
        await file.write("-----------------\n")
        for word, count in results:
            await file.write(f"{word}: {count}\n")

async def main():
    """测试异步数据处理管道的主协程"""
    # 测试文件路径
    input_file = "sample_text.txt"
    output_file = "word_frequencies.txt"
    
    try:
        # 创建一个简单的测试文件
        async with aiofiles.open(input_file, 'w', encoding='utf-8') as file:
            await file.write("This is a sample text. This text contains some words.")
            await file.write("The purpose of this text is to test the async data processing pipeline.")
        
        # 运行数据处理管道
        data = await read_file_async(input_file)
        results = await process_data_async(data)
        await write_results_async(results, output_file)
        
        print(f"数据处理完成,结果已保存到 {output_file}")
    except Exception as e:
        print(f"处理过程中发生错误: {e}")

# 运行主协程
# 注意:实际运行需要取消下面的注释
# print("\n测试异步数据处理管道:")
# asyncio.run(main())

练习10:实现异步任务管理器

题目要求:

  • 实现一个名为AsyncTaskManager的类,用于管理和执行异步任务
  • 类应该支持以下功能:
    1. add_task(task, *args, **kwargs):添加一个异步任务
    2. run_all():并发执行所有添加的任务
    3. run_with_timeout(task, timeout, *args, **kwargs):执行单个任务,并设置超时时间
  • 使用asyncio库实现并发执行和超时处理
  • 编写一个主协程main()来测试异步任务管理器

示例代码:

python
import asyncio

class AsyncTaskManager:
    """异步任务管理器"""
    def __init__(self):
        # 初始化任务列表
        # 你的代码这里
    
    def add_task(self, task, *args, **kwargs):
        """添加一个异步任务"""
        # 你的代码这里
        return self  # 支持链式调用
    
    async def run_all(self):
        """并发执行所有添加的任务"""
        # 你的代码这里
    
    async def run_with_timeout(self, task, timeout, *args, **kwargs):
        """执行单个任务,并设置超时时间"""
        # 你的代码这里

# 测试用的异步任务
async def sample_task(name, delay):
    """一个示例异步任务"""
    print(f"任务 {name} 开始执行,延迟 {delay} 秒")
    await asyncio.sleep(delay)
    print(f"任务 {name} 执行完成")
    return f"任务 {name} 的结果"

async def main():
    """测试异步任务管理器的主协程"""
    # 创建任务管理器
    # 你的代码这里
    
    # 添加多个任务
    # 你的代码这里
    
    # 并发执行所有任务
    # 你的代码这里
    
    # 测试超时功能
    # 你的代码这里

# 运行主协程
print("\n测试异步任务管理器:")
asyncio.run(main())

参考实现:

python
import asyncio

class AsyncTaskManager:
    """异步任务管理器"""
    def __init__(self):
        """初始化任务管理器"""
        self.tasks = []
    
    def add_task(self, task, *args, **kwargs):
        """添加一个异步任务"""
        self.tasks.append((task, args, kwargs))
        return self  # 支持链式调用
    
    async def run_all(self):
        """并发执行所有添加的任务"""
        # 创建所有任务
        async_tasks = [task(*args, **kwargs) for task, args, kwargs in self.tasks]
        # 并发执行所有任务
        results = await asyncio.gather(*async_tasks)
        return results
    
    async def run_with_timeout(self, task, timeout, *args, **kwargs):
        """执行单个任务,并设置超时时间"""
        try:
            # 使用wait_for设置超时
            return await asyncio.wait_for(task(*args, **kwargs), timeout=timeout)
        except asyncio.TimeoutError:
            print(f"任务执行超时({timeout}秒)")
            return None

# 测试用的异步任务
async def sample_task(name, delay):
    """一个示例异步任务"""
    print(f"任务 {name} 开始执行,延迟 {delay} 秒")
    await asyncio.sleep(delay)
    print(f"任务 {name} 执行完成")
    return f"任务 {name} 的结果"

async def main():
    """测试异步任务管理器的主协程"""
    # 创建任务管理器
    manager = AsyncTaskManager()
    
    # 添加多个任务
    manager.add_task(sample_task, "A", 1)
    manager.add_task(sample_task, "B", 2)
    manager.add_task(sample_task, "C", 1.5)
    
    # 并发执行所有任务
    print("\n并发执行所有任务:")
    results = await manager.run_all()
    print(f"所有任务执行完成,结果:{results}")
    
    # 测试超时功能
    print("\n测试任务超时功能:")
    # 正常完成的任务
    result1 = await manager.run_with_timeout(sample_task, 2, "D", 1)
    print(f"任务D结果:{result1}")
    
    # 超时的任务
    result2 = await manager.run_with_timeout(sample_task, 1, "E", 2)
    print(f"任务E结果:{result2}")

# 运行主协程
# 注意:实际运行需要取消下面的注释
# print("\n测试异步任务管理器:")
# asyncio.run(main())

💡 实践应用提示

  1. 迭代器与生成器的选择

    • 如果你需要自定义迭代行为,并且需要维护复杂的状态,可以实现一个自定义迭代器类
    • 对于简单的惰性计算场景,生成器函数或生成器表达式通常是更好的选择,因为它们更简洁、更容易理解
  2. 生成器的高级特性应用场景

    • send()方法可以用于实现双向通信的生成器,例如状态机、解析器等
    • throw()方法可以用于向生成器中注入异常,实现错误处理或特殊控制流
    • close()方法可以用于清理生成器资源,特别是当生成器包含文件句柄等资源时
  3. 协程的实际应用

    • 网络应用:Web服务器、API客户端、WebSocket服务器等
    • 数据处理:大规模数据处理、ETL工作流等
    • 实时系统:聊天应用、游戏服务器、实时监控系统等
    • I/O密集型任务:文件操作、数据库操作、网络请求等
  4. 性能优化建议

    • 对于大量数据的处理,优先使用生成器表达式而不是列表推导式
    • 对于I/O密集型应用,使用异步编程可以显著提高性能
    • 避免在生成器或协程中进行阻塞操作
    • 使用asyncio.gather()并发执行多个独立的异步任务
  5. 常见陷阱与解决方案

    • 迭代器只能遍历一次:如果需要多次遍历,创建一个新的迭代器或转换为列表
    • 忘记启动生成器:在使用send()方法之前,需要先调用next()方法启动生成器
    • 阻塞异步操作:确保在异步代码中只使用异步版本的库函数
    • 过度使用异步:对于CPU密集型任务,多线程或多进程可能是更好的选择

📚 学习资源推荐

  1. 官方文档

  2. 经典书籍

    • 《流畅的Python》 by Luciano Ramalho(第14章到第18章详细介绍了迭代器、生成器和协程)
    • 《Python Cookbook》 by David Beazley & Brian K. Jones(第4章包含了许多关于迭代器和生成器的实用技巧)
  3. 网络资源

  4. 异步编程库

通过这些练习,你已经深入了解了Python中生成器、迭代器和协程的强大功能,并掌握了如何在实际项目中应用这些特性。这些工具不仅可以使你的代码更高效、更优雅,还能帮助你处理各种复杂的数据处理和异步操作场景。 通过这些练习,你已经深入了解了Python中生成器、迭代器和协程的强大功能,并掌握了如何在实际项目中应用这些特性。这些工具不仅可以使你的代码更高效、更优雅,还能帮助你处理各种复杂的数据处理和异步操作场景。

记住,生成器和迭代器是处理数据流的强大工具,而协程则是构建高性能异步应用的基础。通过不断地练习和实践,你会逐渐掌握这些特性的精髓,编写出更加专业、高效的Python代码。

恭喜你完成了进阶阶段的所有学习内容!在这个阶段,你已经掌握了Python的核心应用和工具链,包括面向对象编程高级特性、异常处理进阶、模块和包管理、函数式编程高级特性以及生成器、迭代器和协程。这些知识将为你进一步深入Python编程打下坚实的基础。

继续加油,享受Python编程的乐趣吧!🚀

© 2025 技术博客. All rights reserved by 老周有AI