一、 总体功能概述alloc_spill.py文件实现了一个面向多级存储层次如CPU缓存的、动态的、带溢出Spill操作的内存分配器。其核心任务是给定一个由计算节点如算子及其依赖关系构成的计算图Graph以及一个节点的执行顺序order该分配器会为图中声明的数据缓冲区Buffer在容量有限的多级存储如L1缓存、统一缓存UB、L0A/B/C等中分配空间。当某级存储空间不足时它会智能地选择将部分已分配的数据“溢出”SPILL_OUT到更下一级存储通常是主存并在未来需要时再“载入”SPILL_IN回来从而确保计算流程能在有限的硬件资源下正确执行。该模块的核心输出包括初始分配方案​ (initial_allocs)每个缓冲区在各自存储类型t下的起始偏移量 (offset) 和大小 (size)。Spill操作序列​ (spills)一系列需要在原计算流程中插入的SPILL_OUT写出和SPILL_IN读入操作及其元信息。新的节点执行序列​ (new_order)在原始执行顺序中插入了必要的Spill操作节点后得到的新调度顺序。简而言之这是一个编译器后端或高性能计算调度器中用于解决“寄存器分配”或“缓存着色”问题的关键组件但其抽象层级更高适用于多级、异构的存储单元。二、 关键数据结构详解文件定义和使用了多个核心数据结构来建模问题。1. 常量与配置CACHE_CAP: 一个字典定义了不同层级或类型存储器的容量单位推测为字节。例如“L1”: 4096表示L1缓存大小为4096字节。这构成了分配器的硬件约束。SPILL_PARAMS: 一个字典用于估算Spill操作的开销。cycles_per_byte表示每字节传输的周期数fixed_latency表示每次Spill操作的固定延迟周期。这在估算Spill代价和节点cycles属性时使用。2. 数据类 (Dataclasses)使用dataclass装饰器定义了两个轻量级的数据结构SpillOp: 描述一次完整的“溢出-重载”操作对。bufid(int): 被Spill的缓冲区唯一标识。new_offset(int): 该缓冲区在Spill-In后在新存储位置分配的偏移量。值为-1表示溢出后未被立即重分配可能需要后续再次分配。out_node(int): 对应的SPILL_OUT操作在计算图g中的节点ID。in_node(int): 对应的SPILL_IN操作在计算图g中的节点ID。Allocation: 描述一个缓冲区在存储空间中的位置信息。offset(int): 缓冲区起始偏移地址。size(int): 缓冲区大小。3. 核心函数内部维护的数据结构在核心函数plan_spills_and_allocate内部维护着几个关键的状态字典和列表它们是算法运行的“工作记忆”free_lists​ (Dict[str, List[Tuple[int, int]]]): 为每一种存储类型t维护一个空闲区间列表。每个区间是一个元组(start, end)表示从start到end含两端的连续空间是空闲的。列表始终保持合并后的有序状态无重叠且有序。这是实现动态分配的基础。live​ (Dict[str, Dict[int, Allocation]]): 记录每一种存储类型t中当前活跃已分配且未释放的缓冲区。它是一个双层字典外层键是存储类型t内层键是缓冲区IDbufid值是Allocation对象。这代表了当前的“内存占用量”。uses_positions​ (Dict[int, deque]): 记录每个缓冲区bufid在调度顺序order中所有使用点包括FREE操作的位置索引。deque双端队列的数据结构便于在遍历顺序时不断弹出已过去的、最近的使用点从而快速获取“下一次使用距离”next_use_distance。pending_insertions​ (defaultdict[int, List[Tuple[int, SpillOp]]]): 一个暂存字典键是调度顺序中的位置索引值是在该位置需要插入的SPILL_IN节点ID及其对应的SpillOp对象。这是因为SPILL_IN操作必须在缓冲区下一次被使用之前插入而这个“下一次使用”的位置在遍历到SPILL_OUT点时是已知的但实际插入操作需要延后到遍历到那个位置时进行。三、 核心算法流程深度剖析算法的执行入口是plan_spills_and_allocate(g, order, cache_cap)函数。以下是其逐步的、详细的逻辑拆解。第零步初始化与预处理初始化存储状态为cache_cap中的每一种存储类型t初始化其free_lists为[(0, cap-1)]即整个空间都空闲初始化live[t]为空字典。构建“使用位置”映射遍历计算图g中每个缓冲区的使用信息g.uses_of_buf和释放节点g.free_of_buf计算出每个缓冲区在所有调度节点order中出现的位置索引列表并存入uses_positions。这一步是后续计算“下一次使用距离”的关键准备。第一步主循环——按调度顺序遍历所有节点算法按照用户给定的初始调度顺序order逐个处理节点nid。这是模拟指令/算子执行流程的过程。处理待插入的SPILL_IN节点在处理当前节点nid之前先检查pending_insertions中是否有计划在当前索引idx插入的SPILL_IN节点。如果有则先将这些SPILL_IN节点加入新的执行序列new_order。这保证了Spill-In操作发生在它所属缓冲区的下一次使用点之前。根据节点类型进行相应处理A. ALLOC 节点空间确保 (ensure_space)这是算法的核心决策点。当要为缓冲区b在存储类型t上分配sz大小的空间时首先调用ensure_space(t, sz, idx)。该函数会检查当前t的空闲列表中最大连续块是否足够sz。如果不足则会触发一个循环直到空间足够或无法再释放为止。循环中选择被Spill的受害者遍历当前t中所有活跃缓冲区(live[t])根据一个启发式评分score​ 选择最“适合”溢出的一个。评分公式为score (alloc.size * dist_eff) / (spill_cost_bytes 1e-9)alloc.size: 候选缓冲区的大小。dist_eff: 候选缓冲区“下一次使用距离”next_pos - now_index如果之后不再使用则为无穷大math.inf。距离越大意味着溢出后可以越久不用载回理论上更优。代码用max(dist,1)和1e9处理边界。spill_cost_bytes: 溢出该缓冲区的数据移动开销字节数。如果该缓冲区后续有COPY_IN操作可能表示只读或特殊处理则开销为size只需写出一次否则为2*size需要先写出再读入。公式解读该评分倾向于选择“体积大、下次使用远、且溢出成本低”​ 的缓冲区进行Spill。这是一个权衡了“腾出空间效率”size/dist_eff和“操作成本”spill_cost_bytes的经典启发式策略。执行Spill选中受害者best_b后将其从live[t]移除并将其占用的空间通过free_interval归还给free_lists[t]。在计算图g中创建一个SPILL_OUT节点设置其计算周期(cycles)为spill_cycles(alloc.size)。并建立正确的前后依赖边从其分配节点(ALLOC)指向它从已执行的、使用该缓冲区的最后一个节点指向它保证写出的数据是最新的。规划Spill-In如果该缓冲区未来还会被使用uses_positions中队列非空则需要规划对应的SPILL_IN。计算SPILL_IN后缓冲区的新位置(new_offset)。如果当前有足够空间则立即分配否则置为-1等待后续分配。创建SpillOp记录并创建一个SPILL_IN节点。将SPILL_IN节点加入pending_insertions字典键为其下一次使用的位置(next_use_pos)。这实现了Spill-In操作的延迟插入。更新依赖建立SPILL_OUT - SPILL_IN - FREE的依赖边。如果Spill-In时分配了新位置(new_offset ! -1)则立即更新live[t]将该缓冲区以新偏移重新激活。B. FREE 节点找到缓冲区b在live[t]中对应的Allocation。将其从live[t]中移除。调用free_interval将其占用的空间(offset,size)合并到free_lists[t]的空闲区间中。这模拟了内存的释放。C. 其他计算节点 (如 COPY_IN, COMPUTE 等)不进行任何分配或释放操作只是按顺序添加到new_order中继续执行流程。空间分配在ensure_space确保有足够连续空间后调用best_fit_allocate在free_lists[t]中寻找一个大小大于等于请求大小sz的最小空闲块进行分配最佳适应算法。如果分配成功更新free_lists在live[t]中记录该缓冲区的分配信息(Allocation)并将此初始分配记录到initial_allocs中。如果因碎片化导致best_fit_allocate返回None代码有一个强制清空当前存储层并重新分配的后备策略if off is None:后的循环这保证了分配的强制性但可能引入额外的Spill。第二步收尾工作主循环结束后可能还有一些计划在原始order序列末尾之后插入的SPILL_IN节点tail_positions。将这些节点追加到new_order的末尾。第三步返回结果函数返回三个核心结果initial_allocs各缓冲区的初始分配位置、spills产生的所有Spill操作记录、new_order插入了Spill节点后的完整执行顺序。四、 辅助函数分析best_fit_allocate(free_list, size):功能在有序且合并后的空闲区间列表free_list中使用最佳适应Best-Fit​ 策略分配一个大小为size的连续空间。算法遍历所有空闲区间(s, e)选择长度大于等于size且长度最小的区间。找到后从该区间头部切分出size大小的空间更新区间为(ssize, e)或删除该区间并返回分配起点的s。若找不到则返回None。优点有助于减少外部碎片更有效地利用较小的空闲块。free_interval(free_list, start, size):功能释放从start开始、长度为size的区间并将其与free_list中现有的空闲区间合并保持列表有序且无重叠。算法先将新区间(a, b)加入列表并排序。然后进行一次线性遍历合并如果当前区间与结果列表最后一个区间不连续s merged[-1][1] 1则作为新区间追加否则将最后一个区间的结束位置扩展为max(merged[-1][1], e)。largest_free_segment(free_list):一个简单的工具函数返回空闲列表中最大连续空闲块的大小。用于ensure_space中快速判断当前空间是否满足分配需求。spill_cycles(size):根据SPILL_PARAMS计算Spill操作的周期数模型化为固定延迟 每字节开销 * 数据量。spill_extra_bytes(g, spills):功能计算所有Spill操作引起的额外数据移动总量字节数。这用于评估Spill策略的开销。逻辑首先从计算图中收集每个缓冲区的大小。然后判断每个缓冲区是否有COPY_IN操作有则has_copyinTrue。最后遍历所有spills如果该缓冲区的Spill是COPY_IN类型则只计算一次大小size仅写出否则计算两次2*size写出读入。累加得到总字节数。五、 算法特点与设计思想总结在线Online与启发式决策算法在模拟指令执行的过程中动态做出分配和Spill决策而非基于全局信息的离线优化。其Spill选择策略是一个精心设计的启发式函数平衡了空间收益、时间局部性和操作成本。延迟插入Lazy InsertionSPILL_IN节点并不在SPILL_OUT产生的时刻立即插入序列而是记录其应该出现的位置下一次使用前待到遍历到该位置时才插入。这简化了遍历过程的逻辑并自然地保证了依赖关系的正确性。依赖关系维护算法不仅管理存储空间还负责更新计算图g的节点和边在SPILL_OUT、SPILL_IN、ALLOC、FREE及相关计算节点之间建立正确的数据依赖边保证了程序语义的正确性。支持多级存储通过cache_cap字典和以存储类型t为键的数据结构该算法框架可以同时管理多个独立且容量不同的存储层级如L0A, L0B, L1等为异构内存系统提供支持。基于图的抽象算法强依赖于输入的Graph结构该结构抽象了计算和数据流。这使得算法的适用范围很广不局限于特定类型的计算只要是能表示为节点操作和边依赖的计算流程均可适用。总结alloc_spill.py实现了一个在编译器和硬件调度领域非常核心的、用于解决有限存储资源分配问题的模块。它通过模拟指令流、动态管理空闲空间、并在空间不足时基于启发式规则触发数据在存储层级间的移动Spill最终输出一个可行的内存分配方案以及一个插入了必要数据移动操作的新指令序列。其代码结构清晰将空间管理、Spill决策、图更新等关注点进行了有效的分离是一个设计精良的算法工程实现。源代码import math from collections import defaultdict, deque from dataclasses import dataclass from typing import Dict, List, Tuple, Optional, Set from .graph_io import Graph, Node from .scheduling import compute_Mmax_for_order CACHE_CAP { L1: 4096, UB: 1024, L0A: 256, L0B: 256, L0C: 512, } SPILL_PARAMS {cycles_per_byte:1, fixed_latency:0} dataclass class SpillOp: bufid: int new_offset: int out_node: int in_node: int dataclass class Allocation: offset: int size: int def best_fit_allocate(free_list: List[Tuple[int,int]], size: int) - Optional[int]: best_idxNone; best_lenNone; best_startNone for i,(s,e) in enumerate(free_list): lengthe-s1 if lengthsize and (best_len is None or lengthbest_len): best_lenlength; best_idxi; best_starts if best_idx is None: return None s,e free_list[best_idx] new_start ssize if new_starte: free_list[best_idx](new_start,e) else: free_list.pop(best_idx) return s def free_interval(free_list: List[Tuple[int,int]], start: int, size: int): astart; bstartsize-1 free_list.append((a,b)); free_list.sort() merged[] for s,e in free_list: if not merged or smerged[-1][1]1: merged.append((s,e)) else: merged[-1](merged[-1][0], max(merged[-1][1], e)) free_list[:] merged def plan_spills_and_allocate(g: Graph, order: List[int], cache_cap: Dict[str,int]CACHE_CAP): free_lists {t:[(0,cap-1)] for t,cap in cache_cap.items()} live {t:{} for t in cache_cap} # type - {bufid: Allocation} uses_positions {} for b, lst in g.uses_of_buf.items(): poslist [i for i,nid in enumerate(order) if nid in set(lst) or nidg.free_of_buf.get(b)] poslist sorted(set(poslist)) uses_positions[b] deque(poslist) initial_allocs {} spills: List[SpillOp] [] new_order: List[int] [] pending_insertions defaultdict(list) # schedule_index - list of (in_node_id, SpillOp) def largest_free_segment(free_list: List[Tuple[int,int]]) - int: if not free_list: return 0 return max(e-s1 for s,e in free_list) def spill_cycles(size:int)-int: return int(SPILL_PARAMS[fixed_latency] SPILL_PARAMS[cycles_per_byte]*(size or 0)) def ensure_space(t: str, need: int, now_index: int): while largest_free_segment(free_lists[t]) need: best_bNone; best_score-1.0 for b, alloc in live[t].items(): dq uses_positions.get(b, deque()) next_pos dq[0] if dq else math.inf dist (next_pos - now_index) if next_pos ! math.inf else math.inf has_copyin any(g.nodes[nid].opCOPY_IN for nid in g.uses_of_buf.get(b, [])) spill_cost_bytes alloc.size if has_copyin else (2*alloc.size) dist_eff (1e9 if distmath.inf else max(dist,1)) score (alloc.size * dist_eff) / (spill_cost_bytes 1e-9) if scorebest_score: best_scorescore; best_bb if best_b is None: break alloc live[t].pop(best_b) free_interval(free_lists[t], alloc.offset, alloc.size) out_id g.next_node_id; g.next_node_id1 g.nodes[out_id] Node(idout_id, opSPILL_OUT, pipeMTE3, cyclesspill_cycles(alloc.size), bufs[best_b]) a_id g.alloc_of_buf[best_b] g.edges_out.setdefault(a_id,set()).add(out_id); g.edges_in.setdefault(out_id,set()).add(a_id) executed_nodes [nid for nid in g.uses_of_buf.get(best_b, []) if nid in new_order] if executed_nodes: last_exec executed_nodes[-1] g.edges_out.setdefault(last_exec,set()).add(out_id); g.edges_in.setdefault(out_id,set()).add(last_exec) dq uses_positions.get(best_b, deque()) if dq: next_use_pos dq[0] in_id g.next_node_id; g.next_node_id1 if largest_free_segment(free_lists[t]) alloc.size: new_offset best_fit_allocate(free_lists[t], alloc.size) else: new_offset None sp SpillOp(bufidbest_b, new_offset(new_offset if new_offset is not None else -1), out_nodeout_id, in_nodein_id) spills.append(sp) g.nodes[in_id] Node(idin_id, opSPILL_IN, pipeMTE2, cyclesspill_cycles(alloc.size), bufs[best_b]) g.edges_out.setdefault(out_id,set()).add(in_id); g.edges_in.setdefault(in_id,set()).add(out_id) f_id g.free_of_buf[best_b] g.edges_out.setdefault(in_id,set()).add(f_id); g.edges_in.setdefault(f_id,set()).add(in_id) if new_offset is not None: live[t][best_b] Allocation(offsetnew_offset, sizealloc.size) pending_insertions[next_use_pos].append((in_id, sp)) else: f_id g.free_of_buf[best_b] g.edges_out.setdefault(out_id,set()).add(f_id); g.edges_in.setdefault(f_id,set()).add(out_id) for idx, nid in enumerate(order): if idx in pending_insertions: for in_id, _ in pending_insertions[idx]: new_order.append(in_id) n g.nodes[nid] if n.op ALLOC: b n.bufid; tn.type; szint(n.size or 0) ensure_space(t, sz, idx) off best_fit_allocate(free_lists[t], sz) if off is None: for bb, alloc in list(live[t].items()): live[t].pop(bb) free_interval(free_lists[t], alloc.offset, alloc.size) off best_fit_allocate(free_lists[t], sz) if off is None: off 0 live[t][b] Allocation(offsetoff, sizesz) initial_allocs[(t,b)] Allocation(offsetoff, sizesz) new_order.append(nid) elif n.op FREE: b n.bufid; tn.type alloc live[t].pop(b, None) if alloc is not None: free_interval(free_lists[t], alloc.offset, alloc.size) new_order.append(nid) else: new_order.append(nid) tail_positions sorted([k for k in pending_insertions.keys() if klen(order)]) for k in tail_positions: for in_id, _ in pending_insertions[k]: new_order.append(in_id) return initial_allocs, spills, new_order def spill_extra_bytes(g: Graph, spills: List[SpillOp]) - int: total0 buf_size {} for nid,n in g.nodes.items(): if n.opALLOC and n.bufid is not None: buf_size[n.bufid]int(n.size or 0) if n.opFREE and n.bufid is not None: buf_size.setdefault(n.bufid, int(n.size or 0)) has_copyin set() for b, uses in g.uses_of_buf.items(): for nid in uses: if g.nodes[nid].opCOPY_IN: has_copyin.add(b); break for sp in spills: size buf_size.get(sp.bufid, 0) total (size if sp.bufid in has_copyin else 2*size) return total