Python数据流优化
Python 数据流优化 使用生成器、迭代器和惰性求值处理大规模数据 数据流优化是一种内存高效的数据处理方式。通过生成器管道、迭代器链和惰性求值可以处理远超内存容量的数据集。import itertoolsimport timeimport sys# 1. 生成器基础内存效率对比 def load_all_data(n: int) - list:传统方式一次性将所有数据加载到内存。当 n 很大时会导致 MemoryError。return [i ** 2 for i in range(n)]def generate_data(n: int):生成器方式逐个产生数据几乎不占内存。使用 yield 关键字创建生成器函数。for i in range(n):yield i ** 2def compare_memory():对比列表和生成器的内存占用。生成器只保存当前状态内存开销恒定。n 100000# 列表方式start_mem sys.getsizeof([])list_data load_all_data(n)list_mem sys.getsizeof(list_data)print(f列表占用内存: {list_mem / 1024:.2f} KB)# 生成器方式gen generate_data(n)gen_mem sys.getsizeof(gen)print(f生成器占用内存: {gen_mem} bytes)print(f内存差异: {list_mem / max(gen_mem, 1):.0f}x 倍)# 2. 生成器管道链式数据处理 def read_lines(filename: str):管道第一步逐行读取文件。即使文件有几十 GB也只会占用一行内存。with open(filename, r, encodingutf-8) as f:for line in f:yield line.strip()def filter_lines(lines, keyword: str):管道第二步过滤包含关键字的行。接收生成器返回生成器构成惰性管道。for line in lines:if keyword in line:yield linedef transform_lines(lines):管道第三步对每行进行变换。每个管道步骤都是独立的生成器函数。for line in lines:# 转换为大写并添加前缀yield f[PROCESSED] {line.upper()}def count_words_in_lines(lines):管道第四步统计每行的单词数。管道的末端通常是一个消费数据的循环。for line in lines:word_count len(line.split())yield (line, word_count)def demo_pipeline():演示完整的生成器管道。数据在管道中流动每个步骤只处理一个元素。print( 生成器管道示例 )# 模拟数据实际应用中可能是大文件test_data [apple banana cherry,dog elephant fox,python is great,apple pie is delicious,zip zap zop,]# 保存测试文件with open(_test_pipeline.txt, w) as f:for line in test_data:f.write(line \n)# 构建处理管道lines read_lines(_test_pipeline.txt)filtered filter_lines(lines, apple)transformed transform_lines(filtered)counted count_words_in_lines(transformed)# 消费管道中的数据for line, count in counted:print(f [{count} 词] {line})# 3. itertools 工具集 def demo_itertools():itertools 提供了丰富的流式数据处理工具。所有函数都返回迭代器惰性求值。print( itertools 流式处理 )# chain: 将多个迭代器串联data1 [1, 2, 3]data2 [4, 5, 6]chained itertools.chain(data1, data2)print(f chain: {list(chained)})# islice: 对无限生成器切片def counter():i 0while True:yield ii 1sliced itertools.islice(counter(), 5, 10)print(f islice(5..10): {list(sliced)})# takewhile / dropwhile: 条件截取data [1, 3, 5, 7, 2, 4, 6, 8]taken itertools.takewhile(lambda x: x 5, data)dropped itertools.dropwhile(lambda x: x 5, data)print(f takewhile(5): {list(taken)})print(f dropwhile(5): {list(dropped)})# groupby: 流式分组要求数据已排序data [(1, a), (1, b), (2, c), (2, d)]for key, group in itertools.groupby(data, keylambda x: x[0]):print(f groupby({key}): {list(group)})# zip_longest: 不等长压缩a [1, 2, 3]b [x, y]zipped itertools.zip_longest(a, b, fillvalueNone)print(f zip_longest: {list(zipped)})# accumulate: 累积计算values [1, 2, 3, 4, 5]accumulated itertools.accumulate(values)print(f accumulate: {list(accumulated)})# product: 笛卡尔积可替代嵌套循环colors [red, blue]sizes [S, M, L]products itertools.product(colors, sizes)print(f product: {list(products)})# 4. 惰性求值在大数据中的应用 def lazy_evaluation_demo():惰性求值允许处理理论上无限的数据流。只计算需要的那部分数据。print( 惰性求值示例 )# 无限斐波那契数列生成器def fibonacci():a, b 0, 1while True:yield aa, b b, a b# 获取前 20 个斐波那契数fib fibonacci()first_20 list(itertools.islice(fib, 20))print(f 前 20 个斐波那契数: {first_20})# 找到第一个大于 1000 的斐波那契数fib fibonacci()for num in fib:if num 1000:print(f 第一个大于 1000 的斐波那契数: {num})break# 惰性筛选大文件中的匹配行# 以下操作不会一次性读取整个文件def find_pattern(filename, pattern):with open(filename) as f:for line in f:if pattern in line:yield line# 只找到前 5 个匹配就停止yield from itertools.islice(find_pattern(_test_pipeline.txt, apple), 5)matches list(itertools.islice(filter_lines(read_lines(_test_pipeline.txt), apple),5))print(f 惰性匹配到 {len(matches)} 行包含 apple)# 5. 迭代器链式操作 def demo_iterator_chain():使用函数式编程风格链式处理数据流。避免创建中间列表减少内存使用。print( 迭代器链式操作 )# 传统方式多个中间列表numbers range(100000)squares [x ** 2 for x in numbers]even_squares [x for x in squares if x % 2 0]result_trad sum(even_squares[:1000])# 迭代器链无中间列表result_chain sum(itertools.islice((x ** 2 for x in range(100000)if (x ** 2) % 2 0),1000))print(f 传统方式结果: {result_trad})print(f 迭代器链结果: {result_chain})# 更复杂的链式处理data range(10000)pipeline ((x * 2 for x in data) # 变换if x 5 for x in data # 筛选)# 上述是错误的语法修正如下pipeline (x * 2 for x in data if x 5)result sum(itertools.islice(pipeline, 100))print(f 链式流水线结果: {result})# 6. 实战流式日志分析 def stream_log_analysis():模拟流式分析大规模日志文件。演示如何使用生成器管道处理实时数据流。print( 流式日志分析 )import randomimport datetime# 模拟日志生成器def log_generator(n: int):levels [INFO, WARN, ERROR, DEBUG]services [api, web, db, cache]for _ in range(n):timestamp datetime.datetime.now().isoformat()level random.choice(levels)service random.choice(services)message f{timestamp} [{level}] {service}: 请求处理中...yield message# 实时过滤分析def filter_errors(logs):for log in logs:if ERROR in log:yield logdef extract_service(logs):for log in logs:parts log.split()service parts[3].rstrip(:)yield service# 流式消费logs log_generator(10000)error_logs filter_errors(logs)services extract_service(error_logs)# 统计各服务错误次数error_counts {}for svc in services:error_counts[svc] error_counts.get(svc, 0) 1print(f 各服务错误统计: {error_counts})# 使用 collections.Counter 替代from collections import Counterlogs log_generator(10000)error_services extract_service(filter_errors(logs))counts Counter(error_services)print(f Counter 统计: {dict(counts)})# 性能测试比较不同方式 def benchmark_streaming():对比传统列表方式和流式处理的内存和时间开销。print( 流式处理性能对比 )n 1000000# 列表方式start time.perf_counter()result_list sum([x ** 2 for x in range(n)])t_list time.perf_counter() - start# 生成器方式start time.perf_counter()result_gen sum(x ** 2 for x in range(n))t_gen time.perf_counter() - startprint(f 列表 sum: {t_list:.4f}s (结果: {result_list}))print(f 生成器 sum: {t_gen:.4f}s (结果: {result_gen}))print(f 时间差异: {t_list / t_gen:.2f}x)if __name__ __main__:print( * 50)print(Python 数据流优化)print( * 50)compare_memory()print()demo_pipeline()print()demo_itertools()print()lazy_evaluation_demo()print()demo_iterator_chain()print()stream_log_analysis()print()benchmark_streaming()# 清理测试文件import osfor f in [_test_pipeline.txt]:if os.path.exists(f):os.remove(f)