深入理解C++ Workflow源码(1)
第1篇 一个 Task 是如何诞生的很多人第一次接触 Workflow会觉得它“用起来很轻”但真正神奇的地方不在 API 表面而在于一个 Task 从创建、初始化、调度到回调结束几乎把异步编程里最麻烦的几件事都藏起来了。这一篇我们不急着钻 epoll也不急着看协议细节只回答一个问题用户写下WFTaskFactory::create_http_task()之后框架内部到底发生了什么一、先看用户代码Task 是从工厂出来的Workflow 的设计非常明确任务由工厂创建业务由任务流组织。典型代码大概长这样auto*taskWFTaskFactory::create_http_task(http://www.example.com,3,2,[](WFHttpTask*task){if(task-get_state()WFT_STATE_SUCCESS){protocol::HttpResponse*resptask-get_resp();// 处理响应}});task-start();从这段代码里至少能看到四个事实用户并不直接new某个底层任务对象。用户看到的是强类型任务比如WFHttpTask、WFMySQLTask。用户只写 callback不处理线程、fd、连接池。一个任务本身就能start()说明它最终一定能挂进某种统一调度模型。这正是 Workflow 的第一层抽象所有异步操作先被包装成 SubTask再由 Workflow 把它们编排成 Series 或 Parallel。二、Task 的“壳”为什么一切都先变成 SubTask在内核层最底层的抽象不是 HTTP也不是 MySQL而是SubTaskclassSubTask{public:virtualvoiddispatch()0;private:virtualSubTask*done()0;protected:voidsubtask_done();};这个接口很小但语义非常重dispatch()把任务真正投递出去。done()任务结束后告诉框架“下一个该跑谁”。subtask_done()统一推进整个任务流。也就是说Workflow 并不是“回调到回调”的设计而是“当前 SubTask 完成后返回下一个 SubTask”的设计。这和很多传统异步框架最大的不同在于控制流并没有散掉而是被收敛在统一的任务推进器里。三、Task 的“流”为什么 start() 其实是在启动一个 Series无论是线程任务、网络任务还是定时器任务start()的实现都非常像voidstart(){assert(!series_of(this));Workflow::start_series_work(this,nullptr);}这段代码说明了一个非常关键的设计一个独立任务启动时并不是“裸奔”的它会先被包进一个串行流SeriesWork。这样做有三个好处单个任务和复杂工作流共享同一套执行模型。callback 结束后对象销毁、上下文传递、取消语义都能统一。后续动态往 series 里push_back()新任务变得非常自然。换句话说Workflow 不是“任务系统 工作流系统”两套机制而是一套机制从一开始就覆盖了最简单到最复杂的场景。四、Task 工厂到底创建了什么以 HTTP 为例WFTaskFactory暴露的是工厂接口usingWFHttpTaskWFNetworkTaskprotocol::HttpRequest,protocol::HttpResponse;WFHttpTask不是一个具体类而是一个类型别名。真正被创建出来的通常是某个更复杂的实现类例如HttpTaskImpl.cc里的ComplexHttpTaskclassComplexHttpTask:publicWFComplexClientTaskHttpRequest,HttpResponse{protected:virtualCommMessageOut*message_out();virtualCommMessageIn*message_in();virtualintkeep_alive_timeout();virtualboolinit_success();virtualvoidinit_failed();virtualboolfinish_once();};这层实现很重要因为它说明 Workflow 的任务对象并不是“请求包 回调”这么简单而是一个封装了以下能力的复杂状态机URI 解析DNS/路由选择连接建立SSL 握手请求编码响应解码重定向重试keep-alive所以从用户视角看是“一个 HTTP 任务”从框架视角看其实是“一个隐藏了多个异步阶段的复合任务”。五、Task 为什么能统一成几种基类Workflow 把任务大致收敛成三条主线templateclassINPUT,classOUTPUTclassWFThreadTask:publicExecRequest{...};templateclassREQ,classRESPclassWFNetworkTask:publicCommRequest{...};classWFTimerTask:publicSleepRequest{...};这三类分别对应计算任务交给Executor thrdpool网络任务交给CommScheduler Communicator定时器任务交给Communicator的 timer 机制这也是 Workflow 很漂亮的一点上层 API 很丰富但底层入口非常少。丰富的协议和组件最后都会落到少数几种可调度任务上。六、一个 HTTP Task 的诞生流程把源码串起来后一个 HTTP Task 的出生过程大致是下面这样第 1 步用户调用工厂WFTaskFactory::create_http_task(...)工厂负责创建具体实现对象并把 callback、重试次数、重定向次数这些策略参数灌进去。第 2 步具体任务初始化ComplexHttpTask构造时会设置默认方法、版本等基础状态client_req-set_method(HttpMethodGet);client_req-set_http_version(HTTP/1.1);第 3 步start() 包装成 Series任务调用start()后被Workflow::start_series_work()包成一个最小串行流。第 4 步dispatch() 投递到底层调度器如果是网络任务会进入CommRequest::dispatch()if(this-scheduler-request(this,this-object,this-wait_timeout,this-target)0){this-handle(CS_STATE_ERROR,errno);}第 5 步Communicator 驱动整个异步生命周期后面连接、发送、接收、解析、超时、错误处理都由Communicator和poller接管。第 6 步done() 回到 Series任务完成后会执行 callback然后回到 series 取下一个任务if(this-callback)this-callback(this);deletethis;returnseries-pop();这一步非常关键Workflow 不是“回调结束就结束”而是“回调结束后决定工作流下一步怎么走”。七、为什么说 Workflow 的 Task 是“结构化并发单元”很多异步框架里的 task更像是一段“待执行回调”。Workflow 里的 task 则更像一个完整的并发单元因为它天然带着这些能力有明确的生命周期创建到 callback 结束有统一的归属一定属于某个 series有清晰的完成语义通过done()交出控制权有统一的调度入口dispatch()有强类型输入输出请求/响应对象都挂在任务上这使得 Workflow 的任务既能表达“做一次网络请求”也能表达“作为工作流中的一个节点存在”。八、从“一个 Task 的诞生”里能看出什么设计思想读到这里其实已经能看出 Workflow 的几条核心设计哲学1. 用户接口丰富内核抽象极简上层有 HTTP/MySQL/Redis/Timer/FileIO/Graph 等等底层却尽量收敛到SubTaskSeriesWorkCommRequestExecRequest2. 把复杂异步过程隐藏到“一个任务”里HTTP 任务里可能包含 DNS、连接、SSL、重试、重定向但用户只看到一个 Task。3. 通过 series 统一生命周期和控制流单任务和复杂工作流用同一套机制所以没有“简单场景一套逻辑复杂场景另一套逻辑”的割裂感。4. 不把并发模型暴露给业务代码业务代码写 callback 即可线程、fd、epoll、连接复用都由框架兜底。九、这篇先记住三句话如果你第一次读 Workflow 源码我建议先把下面三句话记住Task 的真正基类是SubTask不是 HTTPTask。独立 Task 启动时会先被包进SeriesWork。一个“看起来简单”的任务内部通常是一个复合状态机。理解了这三点后面看SeriesWork、WFNetworkTask、Communicator时脑子里就不会把它们当成互相独立的模块而会把它们看成同一条链路上的不同层。十、下一篇看什么既然 Task 一出生就会被包进SeriesWork那下一个问题自然就是Series 到底是什么为什么 Workflow 要把所有任务都放进一个串行流里下一篇我们就专门拆开SeriesWork看它如何管理队列、推进后继任务、处理取消以及如何跟并行任务拼成更大的工作流。