🎯 高级魔法师实战:生成器、迭代器和协程练习
🎉 欢迎来到生成器、迭代器和协程的练习环节!通过这些精心设计的练习,你将巩固对Python中处理数据流和异步操作的理解,并提升实际应用能力。让我们开始吧!
📋 练习目标
- 掌握迭代器的概念和自定义迭代器的实现
- 熟练使用生成器函数和生成器表达式
- 理解并应用生成器的高级特性(send、throw、close方法)
- 掌握现代协程(async/await)的使用方法
- 能够在实际项目中应用这些特性提高代码性能和可读性
🚀 基础练习
练习1:自定义迭代器
题目要求:
- 实现一个名为
EvenNumbers的迭代器类,用于生成指定范围内的所有偶数 - 迭代器类需要实现
__iter__()和__next__()方法 - 当迭代结束时,应抛出
StopIteration异常
示例代码:
python
class EvenNumbers:
"""生成指定范围内的所有偶数的迭代器"""
def __init__(self, start, end):
# 初始化起始值和结束值
# 如果start是奇数,则从下一个偶数开始
# 你的代码这里
def __iter__(self):
# 返回自身作为迭代器
# 你的代码这里
def __next__(self):
# 返回下一个偶数
# 如果超过结束值,抛出StopIteration异常
# 你的代码这里
# 测试代码
evens = EvenNumbers(1, 10)
print("1到10之间的偶数:")
for num in evens:
print(num, end=' ')
# 预期输出: 2 4 6 8 10
# 测试迭代器只能使用一次
print("\n再次遍历同一个迭代器:")
for num in evens:
print(num, end=' ')
# 预期输出: 什么都不输出参考实现:
python
class EvenNumbers:
"""生成指定范围内的所有偶数的迭代器"""
def __init__(self, start, end):
"""初始化迭代器,设置起始值和结束值"""
# 如果start是奇数,则从下一个偶数开始
self.current = start if start % 2 == 0 else start + 1
self.end = end
def __iter__(self):
"""返回自身作为迭代器"""
return self
def __next__(self):
"""返回下一个偶数"""
if self.current > self.end:
raise StopIteration
result = self.current
self.current += 2
return result练习2:基本生成器函数
题目要求:
- 实现一个名为
count_up的生成器函数,用于生成从指定起始值开始的无限递增序列 - 生成器函数应该接受一个可选参数
step,用于指定递增的步长,默认为1 - 生成器函数应该接受一个可选参数
max_value,用于指定最大值,默认为None(无限序列)
示例代码:
python
def count_up(start, step=1, max_value=None):
"""生成从start开始的无限递增序列"""
# 你的代码这里
# 测试代码
print("\n从5开始,步长为2的序列(取前5个值):")
counter = count_up(5, 2)
for _ in range(5):
print(next(counter), end=' ')
# 预期输出: 5 7 9 11 13
print("\n\n从10开始,步长为3,最大值为20的序列:")
for num in count_up(10, 3, 20):
print(num, end=' ')
# 预期输出: 10 13 16 19参考实现:
python
def count_up(start, step=1, max_value=None):
"""生成从start开始的无限递增序列"""
current = start
while True:
# 检查是否超过最大值
if max_value is not None and current > max_value:
break
yield current
current += step练习3:生成器表达式
题目要求:
- 使用生成器表达式重写以下列表推导式
- 比较两者的内存使用情况
- 解释为什么在处理大量数据时生成器表达式更有优势
示例代码:
python
# 原列表推导式
squares_list = [x*x for x in range(1000000)]
# 使用生成器表达式重写
# 你的代码这里
# 比较内存使用情况
import sys
print(f"列表推导式内存使用: {sys.getsizeof(squares_list)} 字节")
print(f"生成器表达式内存使用: {sys.getsizeof(squares_generator)} 字节")
# 解释为什么生成器表达式更适合处理大量数据
# 你的解释这里参考实现:
python
# 原列表推导式
squares_list = [x*x for x in range(1000000)]
# 使用生成器表达式重写
squares_generator = (x*x for x in range(1000000))
# 比较内存使用情况
import sys
print(f"列表推导式内存使用: {sys.getsizeof(squares_list)} 字节")
print(f"生成器表达式内存使用: {sys.getsizeof(squares_generator)} 字节")
# 解释
print("\n为什么生成器表达式更适合处理大量数据:")
print("1. 列表推导式会一次性创建完整的列表并加载到内存中")
print("2. 生成器表达式只在需要时才计算下一个值,内存占用小")
print("3. 对于大数据集,生成器表达式可以避免内存溢出问题")
print("4. 生成器表达式更加高效,特别是当你只需要遍历数据一次时")练习4:高级生成器方法(send、throw、close)
题目要求:
- 实现一个名为
interactive_counter的生成器函数,它可以接收外部发送的值来控制计数 - 生成器应该支持以下操作:
- 初始状态下,生成器从0开始计数
- 当接收到字符串
'reset'时,计数重置为0 - 当接收到整数
n时,计数增加n - 当接收到字符串
'stop'时,生成器停止并抛出StopIteration异常
- 使用
send()方法向生成器发送命令 - 使用
close()方法手动关闭生成器
示例代码:
python
def interactive_counter():
"""一个可以通过send()方法控制的计数器生成器"""
count = 0
command = yield count # 第一次yield返回初始计数,并等待命令
while True:
# 根据接收到的命令执行不同操作
# 你的代码这里
# 测试代码
print("\n测试交互式计数器:")
counter = interactive_counter()
# 启动生成器(获取初始值)
print(next(counter)) # 预期输出: 0
# 发送各种命令
try:
print(counter.send(5)) # 增加5,预期输出: 5
print(counter.send(3)) # 增加3,预期输出: 8
print(counter.send('reset')) # 重置,预期输出: 0
print(counter.send(10)) # 增加10,预期输出: 10
print(counter.send('stop')) # 停止,应该抛出StopIteration异常
except StopIteration:
print("生成器已停止")
# 测试手动关闭生成器
print("\n测试手动关闭生成器:")
counter2 = interactive_counter()
print(next(counter2)) # 预期输出: 0
counter2.close()
# 尝试在关闭后发送命令
try:
counter2.send(5)
except StopIteration:
print("生成器已关闭")参考实现:
python
def interactive_counter():
"""一个可以通过send()方法控制的计数器生成器"""
count = 0
command = yield count # 第一次yield返回初始计数,并等待命令
while True:
if command == 'reset':
count = 0
elif command == 'stop':
# 清理操作(如果有的话)
break
elif isinstance(command, int):
count += command
else:
# 忽略未知命令
pass
# 再次yield当前计数,并等待下一个命令
command = yield count练习5:使用yield from组合生成器
题目要求:
- 实现三个生成器函数:
letters_generator(生成A-Z的字母)、numbers_generator(生成1-10的数字)和symbols_generator(生成常见标点符号) - 实现一个名为
combined_generator的生成器函数,使用yield from语句组合上述三个生成器 - 在组合生成器中,添加适当的分隔信息,以表明正在生成的是哪种类型的数据
示例代码:
python
def letters_generator():
"""生成A-Z的字母"""
# 你的代码这里
def numbers_generator():
"""生成1-10的数字"""
# 你的代码这里
def symbols_generator():
"""生成常见标点符号"""
# 你的代码这里
def combined_generator():
"""组合三个生成器"""
# 你的代码这里
# 测试代码
print("\n测试组合生成器:")
for item in combined_generator():
print(item, end=' ')
# 预期输出类似于:
# [字母] A B C ... Z
# [数字] 1 2 3 ... 10
# [符号] ! @ # $ %参考实现:
python
def letters_generator():
"""生成A-Z的字母"""
for letter in 'ABCDEFGHIJKLMNOPQRSTUVWXYZ':
yield letter
def numbers_generator():
"""生成1-10的数字"""
for number in range(1, 11):
yield number
def symbols_generator():
"""生成常见标点符号"""
for symbol in '!@#$%^&*()':
yield symbol
def combined_generator():
"""组合三个生成器"""
yield "[字母]"
yield from letters_generator()
yield "\n[数字]"
yield from numbers_generator()
yield "\n[符号]"
yield from symbols_generator()🎯 挑战练习
练习6:实现一个质数生成器
题目要求:
- 实现一个名为
prime_generator的生成器函数,用于生成无限个质数 - 使用埃拉托斯特尼筛法或其他高效算法来判断一个数是否为质数
- 添加一个可选参数
max_count,用于限制生成的质数数量
示例代码:
python
def prime_generator(max_count=None):
"""生成无限个质数的生成器"""
# 你的代码这里
# 测试代码
print("\n生成前10个质数:")
primes = prime_generator(10)
for prime in primes:
print(prime, end=' ')
# 预期输出: 2 3 5 7 11 13 17 19 23 29参考实现:
python
def prime_generator(max_count=None):
"""生成无限个质数的生成器"""
count = 0
# 从2开始检查(第一个质数)
num = 2
while max_count is None or count < max_count:
is_prime = True
# 只需检查到num的平方根即可
for i in range(2, int(num**0.5) + 1):
if num % i == 0:
is_prime = False
break
if is_prime:
yield num
count += 1
num += 1练习7:实现一个可重置的斐波那契生成器
题目要求:
- 实现一个名为
resettable_fibonacci的生成器函数,用于生成斐波那契数列 - 生成器应该支持以下操作:
- 初始状态下,生成器从0, 1开始生成斐波那契数列
- 当接收到字符串
'reset'时,生成器重置为初始状态 - 当接收到一个元组
(a, b)时,生成器从指定的两个数开始生成斐波那契数列
- 使用
send()方法向生成器发送命令
示例代码:
python
def resettable_fibonacci():
"""一个可重置的斐波那契数列生成器"""
# 你的代码这里
# 测试代码
print("\n测试可重置的斐波那契生成器:")
fib = resettable_fibonacci()
# 获取前5个斐波那契数
print("前5个斐波那契数:")
for _ in range(5):
print(next(fib), end=' ')
# 预期输出: 0 1 1 2 3
# 重置生成器并获取前3个斐波那契数
print("\n\n重置后获取前3个斐波那契数:")
print(fib.send('reset')) # 重置并获取第一个数
print(next(fib)) # 获取第二个数
print(next(fib)) # 获取第三个数
# 预期输出: 0 1 1
# 从指定的两个数开始生成斐波那契数列
print("\n\n从(10, 20)开始生成前4个斐波那契数:")
print(fib.send((10, 20))) # 设置起始值并获取第一个数
print(next(fib)) # 获取第二个数
print(next(fib)) # 获取第三个数
print(next(fib)) # 获取第四个数
# 预期输出: 10 20 30 50 80参考实现:
python
def resettable_fibonacci():
"""一个可重置的斐波那契数列生成器"""
a, b = 0, 1
command = yield a # 第一次yield返回0,并等待命令
while True:
if command == 'reset':
a, b = 0, 1
command = yield a # 重置后返回0,并等待命令
elif isinstance(command, tuple) and len(command) == 2:
a, b = command
command = yield a # 设置新的起始值后返回a,并等待命令
else:
a, b = b, a + b
command = yield a # 返回下一个斐波那契数,并等待命令练习8:实现异步计数器(现代协程)
题目要求:
- 实现一个名为
async_counter的协程函数,它可以按照指定的时间间隔递增计数 - 协程应该支持以下参数:
interval:计数递增的时间间隔(秒)max_count:最大计数值,达到后协程结束callback:一个可选的回调函数,每次计数递增时调用
- 使用
async/await语法和asyncio.sleep()函数实现 - 编写一个主协程
main()来测试异步计数器
示例代码:
python
import asyncio
async def async_counter(interval, max_count, callback=None):
"""异步计数器"""
# 你的代码这里
async def main():
"""测试异步计数器的主协程"""
# 定义一个简单的回调函数
def on_count(count):
print(f"计数: {count}")
# 运行异步计数器(每秒计数一次,最多计数5次)
# 你的代码这里
# 运行主协程
print("\n测试异步计数器:")
asyncio.run(main())
# 预期输出(每秒输出一次):
# 计数: 1
# 计数: 2
# 计数: 3
# 计数: 4
# 计数: 5
# 计数完成参考实现:
python
import asyncio
async def async_counter(interval, max_count, callback=None):
"""异步计数器"""
count = 0
while count < max_count:
await asyncio.sleep(interval)
count += 1
if callback:
callback(count)
return count
async def main():
"""测试异步计数器的主协程"""
# 定义一个简单的回调函数
def on_count(count):
print(f"计数: {count}")
# 运行异步计数器(每秒计数一次,最多计数5次)
total = await async_counter(1, 5, on_count)
print(f"计数完成,总计数值: {total}")
# 运行主协程
# 注意:实际运行需要取消下面的注释
# print("\n测试异步计数器:")
# asyncio.run(main())练习9:实现异步数据处理管道
题目要求:
- 实现一个异步数据处理管道,包括三个协程函数:
read_file_async(file_path):异步读取文件内容process_data_async(data):异步处理数据(例如,计算单词频率)write_results_async(results, output_file):异步写入处理结果
- 使用
aiofiles库进行异步文件操作(需要安装:pip install aiofiles) - 编写一个主协程
main()来测试整个数据处理管道
示例代码:
python
import asyncio
import aiofiles
import collections
import re
async def read_file_async(file_path):
"""异步读取文件内容"""
# 你的代码这里
async def process_data_async(data):
"""异步处理数据(计算单词频率)"""
# 你的代码这里
async def write_results_async(results, output_file):
"""异步写入处理结果"""
# 你的代码这里
async def main():
"""测试异步数据处理管道的主协程"""
# 测试文件路径(确保文件存在)
input_file = "sample_text.txt"
output_file = "word_frequencies.txt"
try:
# 创建一个简单的测试文件
# 你的代码这里
# 运行数据处理管道
# 你的代码这里
print(f"数据处理完成,结果已保存到 {output_file}")
except Exception as e:
print(f"处理过程中发生错误: {e}")
# 运行主协程
print("\n测试异步数据处理管道:")
asyncio.run(main())参考实现:
python
import asyncio
import aiofiles
import collections
import re
async def read_file_async(file_path):
"""异步读取文件内容"""
async with aiofiles.open(file_path, 'r', encoding='utf-8') as file:
return await file.read()
async def process_data_async(data):
"""异步处理数据(计算单词频率)"""
# 使用正则表达式提取所有单词(转小写)
words = re.findall(r'\b\w+\b', data.lower())
# 计算单词频率
word_counts = collections.Counter(words)
# 按频率降序排序
sorted_words = sorted(word_counts.items(), key=lambda x: x[1], reverse=True)
return sorted_words
async def write_results_async(results, output_file):
"""异步写入处理结果"""
async with aiofiles.open(output_file, 'w', encoding='utf-8') as file:
await file.write("单词频率统计结果:\n")
await file.write("-----------------\n")
for word, count in results:
await file.write(f"{word}: {count}\n")
async def main():
"""测试异步数据处理管道的主协程"""
# 测试文件路径
input_file = "sample_text.txt"
output_file = "word_frequencies.txt"
try:
# 创建一个简单的测试文件
async with aiofiles.open(input_file, 'w', encoding='utf-8') as file:
await file.write("This is a sample text. This text contains some words.")
await file.write("The purpose of this text is to test the async data processing pipeline.")
# 运行数据处理管道
data = await read_file_async(input_file)
results = await process_data_async(data)
await write_results_async(results, output_file)
print(f"数据处理完成,结果已保存到 {output_file}")
except Exception as e:
print(f"处理过程中发生错误: {e}")
# 运行主协程
# 注意:实际运行需要取消下面的注释
# print("\n测试异步数据处理管道:")
# asyncio.run(main())练习10:实现异步任务管理器
题目要求:
- 实现一个名为
AsyncTaskManager的类,用于管理和执行异步任务 - 类应该支持以下功能:
add_task(task, *args, **kwargs):添加一个异步任务run_all():并发执行所有添加的任务run_with_timeout(task, timeout, *args, **kwargs):执行单个任务,并设置超时时间
- 使用
asyncio库实现并发执行和超时处理 - 编写一个主协程
main()来测试异步任务管理器
示例代码:
python
import asyncio
class AsyncTaskManager:
"""异步任务管理器"""
def __init__(self):
# 初始化任务列表
# 你的代码这里
def add_task(self, task, *args, **kwargs):
"""添加一个异步任务"""
# 你的代码这里
return self # 支持链式调用
async def run_all(self):
"""并发执行所有添加的任务"""
# 你的代码这里
async def run_with_timeout(self, task, timeout, *args, **kwargs):
"""执行单个任务,并设置超时时间"""
# 你的代码这里
# 测试用的异步任务
async def sample_task(name, delay):
"""一个示例异步任务"""
print(f"任务 {name} 开始执行,延迟 {delay} 秒")
await asyncio.sleep(delay)
print(f"任务 {name} 执行完成")
return f"任务 {name} 的结果"
async def main():
"""测试异步任务管理器的主协程"""
# 创建任务管理器
# 你的代码这里
# 添加多个任务
# 你的代码这里
# 并发执行所有任务
# 你的代码这里
# 测试超时功能
# 你的代码这里
# 运行主协程
print("\n测试异步任务管理器:")
asyncio.run(main())参考实现:
python
import asyncio
class AsyncTaskManager:
"""异步任务管理器"""
def __init__(self):
"""初始化任务管理器"""
self.tasks = []
def add_task(self, task, *args, **kwargs):
"""添加一个异步任务"""
self.tasks.append((task, args, kwargs))
return self # 支持链式调用
async def run_all(self):
"""并发执行所有添加的任务"""
# 创建所有任务
async_tasks = [task(*args, **kwargs) for task, args, kwargs in self.tasks]
# 并发执行所有任务
results = await asyncio.gather(*async_tasks)
return results
async def run_with_timeout(self, task, timeout, *args, **kwargs):
"""执行单个任务,并设置超时时间"""
try:
# 使用wait_for设置超时
return await asyncio.wait_for(task(*args, **kwargs), timeout=timeout)
except asyncio.TimeoutError:
print(f"任务执行超时({timeout}秒)")
return None
# 测试用的异步任务
async def sample_task(name, delay):
"""一个示例异步任务"""
print(f"任务 {name} 开始执行,延迟 {delay} 秒")
await asyncio.sleep(delay)
print(f"任务 {name} 执行完成")
return f"任务 {name} 的结果"
async def main():
"""测试异步任务管理器的主协程"""
# 创建任务管理器
manager = AsyncTaskManager()
# 添加多个任务
manager.add_task(sample_task, "A", 1)
manager.add_task(sample_task, "B", 2)
manager.add_task(sample_task, "C", 1.5)
# 并发执行所有任务
print("\n并发执行所有任务:")
results = await manager.run_all()
print(f"所有任务执行完成,结果:{results}")
# 测试超时功能
print("\n测试任务超时功能:")
# 正常完成的任务
result1 = await manager.run_with_timeout(sample_task, 2, "D", 1)
print(f"任务D结果:{result1}")
# 超时的任务
result2 = await manager.run_with_timeout(sample_task, 1, "E", 2)
print(f"任务E结果:{result2}")
# 运行主协程
# 注意:实际运行需要取消下面的注释
# print("\n测试异步任务管理器:")
# asyncio.run(main())💡 实践应用提示
迭代器与生成器的选择:
- 如果你需要自定义迭代行为,并且需要维护复杂的状态,可以实现一个自定义迭代器类
- 对于简单的惰性计算场景,生成器函数或生成器表达式通常是更好的选择,因为它们更简洁、更容易理解
生成器的高级特性应用场景:
send()方法可以用于实现双向通信的生成器,例如状态机、解析器等throw()方法可以用于向生成器中注入异常,实现错误处理或特殊控制流close()方法可以用于清理生成器资源,特别是当生成器包含文件句柄等资源时
协程的实际应用:
- 网络应用:Web服务器、API客户端、WebSocket服务器等
- 数据处理:大规模数据处理、ETL工作流等
- 实时系统:聊天应用、游戏服务器、实时监控系统等
- I/O密集型任务:文件操作、数据库操作、网络请求等
性能优化建议:
- 对于大量数据的处理,优先使用生成器表达式而不是列表推导式
- 对于I/O密集型应用,使用异步编程可以显著提高性能
- 避免在生成器或协程中进行阻塞操作
- 使用
asyncio.gather()并发执行多个独立的异步任务
常见陷阱与解决方案:
- 迭代器只能遍历一次:如果需要多次遍历,创建一个新的迭代器或转换为列表
- 忘记启动生成器:在使用
send()方法之前,需要先调用next()方法启动生成器 - 阻塞异步操作:确保在异步代码中只使用异步版本的库函数
- 过度使用异步:对于CPU密集型任务,多线程或多进程可能是更好的选择
📚 学习资源推荐
官方文档:
经典书籍:
- 《流畅的Python》 by Luciano Ramalho(第14章到第18章详细介绍了迭代器、生成器和协程)
- 《Python Cookbook》 by David Beazley & Brian K. Jones(第4章包含了许多关于迭代器和生成器的实用技巧)
网络资源:
异步编程库:
通过这些练习,你已经深入了解了Python中生成器、迭代器和协程的强大功能,并掌握了如何在实际项目中应用这些特性。这些工具不仅可以使你的代码更高效、更优雅,还能帮助你处理各种复杂的数据处理和异步操作场景。 通过这些练习,你已经深入了解了Python中生成器、迭代器和协程的强大功能,并掌握了如何在实际项目中应用这些特性。这些工具不仅可以使你的代码更高效、更优雅,还能帮助你处理各种复杂的数据处理和异步操作场景。
记住,生成器和迭代器是处理数据流的强大工具,而协程则是构建高性能异步应用的基础。通过不断地练习和实践,你会逐渐掌握这些特性的精髓,编写出更加专业、高效的Python代码。
恭喜你完成了进阶阶段的所有学习内容!在这个阶段,你已经掌握了Python的核心应用和工具链,包括面向对象编程高级特性、异常处理进阶、模块和包管理、函数式编程高级特性以及生成器、迭代器和协程。这些知识将为你进一步深入Python编程打下坚实的基础。
继续加油,享受Python编程的乐趣吧!🚀




