ZLMediaKit 源码分析(六):RTMP 协议处理与流转发机制
本文从 RTMP 握手开始深入分析 Chunk 协议解析、推流消息处理、拉流数据分发追踪到 Socket 层的读写操作。1. RTMP 协议概述RTMPReal-Time Messaging Protocol是 Adobe 设计的流媒体传输协议基于 TCP默认端口 1935握手分为 Simple Handshake 和 Complex Handshake数据以 Message 为单位通过 Chunk 分片传输支持 AMF0/AMF3 两种序列化格式2. RtmpSession 类classRtmpSession:publicTcpSession,publicRtmpSplitter,publicMediaSourceEvent{// RTMP 完整会话RtmpMediaSource::Ptr _push_src;// 推流时的媒体源RingBufferRtmpPacket::Reader::Ptr _ring_reader;// 拉流 Reader};3. RTMP 握手3.1 状态机Client Server │ │ ├── C0 (1 byte) ─────────→│ version ├── C1 (1536 bytes) ────→│ time random │ │ │←── S0 (1 byte) ────────┤ version │←── S1 (1536 bytes) ────┤ time random │ │ │←── S2 (1536 bytes) ────┤ echo C1 ├── C2 (1536 bytes) ────→│ echo S1 │ │ │ Handshake Done │3.2 Simple Handshake 实现voidRtmpSession::onRecv(constBuffer::Ptrbuf){_alive_ticker.resetTime();if(_handshake_statekHandshakeUnstarted){// 第一阶段C0 C1handleC0C1(buf);}elseif(_handshake_statekHandshakeS1Sent){// 第三阶段C2handleC2(buf);}else{// 握手完成解析 ChunkRtmpSplitter::onRecv(buf);}}3.3 handleC0C1voidRtmpSession::handleC0C1(constBuffer::Ptrbuf){if(buf-size()11536)return;// 等待更多数据// C0: versionuint8_tversionbuf-data()[0];if(version!3){shutdown(SockException(Err_other,unsupported version));return;}// C1: time(4) zero(4) random(1528)constuint8_t*c1buf-data()1;uint32_tc1_timentohl(*(uint32_t*)c1);// 尝试 Complex Handshake检查 C1 是否为 Flash 格式if(isComplexHandshake(c1)){handleComplexHandshake(c1);return;}// Simple Handshake回复 S0 S1 S2chars0s1s2[115361536];// S0s0s1s2[0]3;// S1: time zero random*(uint32_t*)(s0s1s21)htonl(time(nullptr));*(uint32_t*)(s0s1s25)0;// ... fill random// S2: echo C1memcpy(s0s1s211536,c1,1536);_sock-send(s0s1s2,sizeof(s0s1s2));_handshake_statekHandshakeS1Sent;}_sock-send()调用链Socket::send(buf, len) → ::send(fd, data, len, MSG_NOSIGNAL) 系统调用3.4 handleC2voidRtmpSession::handleC2(constBuffer::Ptrbuf){// C2: echo S1不验证内容直接进入 chunk 阶段_handshake_statekHandshakeDone;// 后续数据按 Chunk 协议解析}4. RTMP Chunk 协议4.1 Chunk 格式0 1 2 3 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 -------------------------------- |fmt(2) | cs id (6) | -------------------------------- | message type id | message length | -------------------------------- | timestamp | -------------------------------- | stream id | -------------------------------- | chunk data | --------------------------------4.2 RtmpSplitter — Chunk 分割解析classRtmpSplitter:publicHttpRequestSplitter{voidonRecvHeader(constchar*data,size_t len)override{// 解析 Chunk HeaderonRtmpChunk(data,len);}};4.3 Chunk 解析核心voidRtmpSession::onRtmpChunk(constRtmpPacket::Ptrpkt){switch(pkt-header.type_id){caseRTMP_MSG_SET_CHUNK_SIZE:handleSetChunkSize(pkt);break;caseRTMP_MSG_WINDOW_ACK_SIZE:handleWindowAckSize(pkt);break;caseRTMP_MSG_ACK:break;caseRTMP_MSG_CMD:// AMF0 CommandcaseRTMP_MSG_CMD1:// AMF3 CommandhandleCmd(pkt);break;caseRTMP_MSG_AUDIO:handleAudio(pkt);break;caseRTMP_MSG_VIDEO:handleVideo(pkt);break;caseRTMP_MSG_DATA:caseRTMP_MSG_DATA1:handleData(pkt);break;}}5. RTMP 推流流程5.1 流程总览推流端 ZLMediaKit │ │ ├── Handshake ────────────→│ C0/C1 → S0/S1/S2 → C2 │ │ ├── connect ──────────────→│ AMF0 Command: connect │←── _result ─────────────┤ │ │ ├── createStream ─────────→│ AMF0 Command: createStream │←── _result (stream_id) ─┤ │ │ ├── publish ──────────────→│ AMF0 Command: publish │ │ 创建 RtmpMediaSource │←── onStatus(NetStream) ─┤ │ │ ├── Audio/Video Data ─────→│ RTMP Audio/Video Message │ │ → RtmpMediaSource::onWrite() │ │ → RingBuffer::write() │ │ → MultiMediaSourceMuxer 分发5.2 connect 处理voidRtmpSession::handleCmd_connect(AMFDecoderdec){// 1. 解析 connect 参数autoparamsdec.loadObject();autoappparams[app].as_string();autotcUrlparams[tcUrl].as_string();// 2. 设置 Window Ack Size告诉客户端 ack 间隔sendWindowAckSize(kWindowAckSize);// 3. 设置 Peer BandwidthsendPeerBandwidth(kPeerBandwidth);// 4. 设置 Chunk SizesendSetChunkSize(kDefaultChunkSize);// 5. 回复 _resultAMFEncoder enc;enc.onString(_result);enc.onObject({{fmsVer,FMS/3,0,1,123}});enc.onObject({{level,status},{code,NetConnection.Connect.Success}});sendResponse(RTMP_MSG_CMD,enc.data());}5.3 publish 处理voidRtmpSession::handleCmd_publish(AMFDecoderdec){// 1. 解析 publish 参数autostream_namedec.loadString();autostream_typedec.loadString();// live / record / append// 2. 鉴权onCheckMediaAuth(stream_name);// 3. 创建 RtmpMediaSource_push_srcstd::make_sharedRtmpMediaSource(_vhost,_app,stream_name);// 4. 创建 MultiMediaSourceMuxer_muxerstd::make_sharedMultiMediaSourceMuxer(_vhost,_app,stream_name);// 5. 回复 onStatussendOnStatus(NetStream.Publish.Start);}5.4 音视频数据处理voidRtmpSession::handleVideo(constRtmpPacket::Ptrpkt){if(!_push_src)return;autodatapkt-buffer-data();autolenpkt-buffer-size();// RTMP Video Message 格式:// [1 byte flags] [payload]// flags: upper 4 bits codec(7AVC), lower 4 bits frame type(1key)uint8_tflagsdata[0];boolis_key(flags0x0F)1;if((flags4)7){// H264/AVCuint8_tavc_packet_typedata[1];if(avc_packet_type0){// AVC Sequence Header (SPS PPS)_push_src-onWrite(makeFrame(CodecH264,data,len,true));}elseif(avc_packet_type1){// AVC NALU_push_src-onWrite(makeFrame(CodecH264,data,len,is_key));}}}voidRtmpSession::handleAudio(constRtmpPacket::Ptrpkt){if(!_push_src)return;autodatapkt-buffer-data();uint8_tflagsdata[0];// RTMP Audio Message 格式:// [1 byte flags] [payload]// flags: upper 4 bits codec(10AAC), lower 4 bits ...if((flags4)10){// AACuint8_taac_packet_typedata[1];if(aac_packet_type0){// AAC Sequence Header (AudioSpecificConfig)_push_src-onWrite(makeFrame(CodecAAC,data,len,true));}else{// AAC Raw Data_push_src-onWrite(makeFrame(CodecAAC,data,len,false));}}}6. RTMP 拉流流程6.1 流程总览拉流端 ZLMediaKit │ │ ├── Handshake ────────────→│ ├── connect ──────────────→│ │←── _result ─────────────┤ ├── createStream ─────────→│ │←── _result ─────────────┤ │ │ ├── play ─────────────────→│ 查找 RtmpMediaSource │ │ 注册 RingBuffer Reader │←── onStatus(NetStream) ─┤ │ │ │←── Audio/Video Data ────┤ 从 RingBuffer 读取 → 发送 │←── Audio/Video Data ────┤ 持续发送...6.2 play 处理voidRtmpSession::handleCmd_play(AMFDecoderdec){autostream_namedec.loadString();// 1. 查找 MediaSourceautosrcRtmpMediaSource::find(_vhost,_app,stream_name);if(!src){sendOnStatus(NetStream.Play.StreamNotFound);return;}// 2. 注册 RingBuffer Reader_ring_readersrc-createReader();_ring_reader-setReadCB([this](constRtmpPacket::Ptrpkt){// 发送 RTMP MessagesendRtmpPacket(pkt);});// 3. 发送缓存的 MetaData AVC Header AAC Headerautometasrc-getMetaData();if(meta){sendResponse(RTMP_MSG_DATA,meta);}autovideo_headersrc-getVideoHeader();if(video_header){sendResponse(RTMP_MSG_VIDEO,video_header);}autoaudio_headersrc-getAudioHeader();if(audio_header){sendResponse(RTMP_MSG_AUDIO,audio_header);}// 4. 回复 onStatussendOnStatus(NetStream.Play.Start);}6.3 发送 RTMP 数据voidRtmpSession::sendRtmpPacket(constRtmpPacket::Ptrpkt){// 将 RtmpMessage 封装成 Chunk 格式发送autobufferRtmpChunkEncoder::encode(pkt,_chunk_size);_sock-send(buffer);}Chunk 编码调用链sendRtmpPacket() → RtmpChunkEncoder::encode() → 根据 cs_id 和 fmt 类型写 Chunk Header → 按 chunk_size 分片默认 128 字节 → Socket::send(buffer) → ::send(fd, data, len, MSG_NOSIGNAL)7. RTMP Chunk 编码详解7.1 Chunk 分片当 Message 长度超过 chunk_size 时需要分片Message: [Header] [Data 0..chunk_size-1] [Data chunk_size..2*chunk_size-1] ... ↓ ↓ ↓ Type 0 Type 3 Type 3 (Full Header) (Only cs_id) (Only cs_id)7.2 编码实现classRtmpChunkEncoder{staticBuffer::Ptrencode(constRtmpPacket::Ptrpkt,uint32_tchunk_size){BufferLikeString buffer;autodatapkt-buffer-data();autolenpkt-buffer-size();// 第一个 ChunkType 0 HeaderwriteChunkHeader(buffer,0,pkt-header);// 写数据和后续分片size_t offset0;while(offsetlen){autowrite_lenmin(chunk_size,len-offset);buffer.append(dataoffset,write_len);offsetwrite_len;if(offsetlen){// 后续 ChunkType 3 Header仅 cs_idwriteChunkHeader(buffer,3,pkt-header);}}returnstd::make_sharedBufferLikeString(std::move(buffer));}};8. Ack 机制8.1 Window Ack SizevoidRtmpSession::handleWindowAckSize(constRtmpPacket::Ptrpkt){_window_ack_sizepkt-loadInt32();}// 每收到 _window_ack_size 字节数据后发送 AckvoidRtmpSession::checkAck(){_bytes_received_last_pkt_size;if(_bytes_received_window_ack_size_last_ack_bytes){sendAck(_bytes_received);_last_ack_bytes_bytes_received;}}9. 完整数据流图9.1 推流推流端 TCP 数据到达 → epoll_wait → EPOLLIN → Socket::onRead() → recv() → RtmpSession::onRecv() → 握手阶段: handleC0C1/handleC2 → Chunk 阶段: → RtmpSplitter::onRecv() — Chunk 分割 → onRtmpChunk() → handleVideo/handleAudio() → RtmpMediaSource::onWrite(rtmp_pkt) → RingBuffer::write() → 通知所有 Reader → 各协议 Muxer 输出9.2 拉流RingBuffer::write() [推流端写入] → RingReaderDispatcher::onRead() → 拉流端 Reader 回调 → RtmpSession::sendRtmpPacket() → RtmpChunkEncoder::encode() — Chunk 编码 → Socket::send() → ::send(fd, data, len) 系统调用10. RTMP vs RTSP 对比维度RTMPRTSP传输层TCPTCP UDP可选数据封装Chunk AMFRTP握手C0/C1/S0/S1/S2/C21537*3字节无直接请求媒体描述MetaData (AMF)SDP延迟1-3 秒0.5-1 秒加密RTMPSTLS无标准加密浏览器播放需 Flash/转 FLV不支持11. 小结ZLMediaKit 的 RTMP 实现要点Simple/Complex 双握手自动检测 C1 格式兼容 Flash 和非 Flash 客户端Chunk 分片大 Message 按 chunk_size 分片Type 0/3 Header 优化带宽AMF0 序列化Command 消息使用 AMF0 编解码RingBuffer 分发推流写入 RingBuffer拉流 Reader 回调发送零拷贝缓存关键帧新拉流端连接时先发送 MetaData Sequence Header确保快速起播下一篇HTTP-FLV 低延迟直播流分析