基于Python的多进程UDP报文发送工具
[rootlocalhost ~]# python udp_sender.py --count 100000 --threads 4 --pps 40000启动高速UDP发送端...发送数量: 100000源地址: 1.1.1.1:6666 - 目的地址: 2.1.1.1:7777预计速率: 40000.00 pps每个进程速率: 10000.00 ppsTTL: 64进程数: 4调试模式: 关闭发送模式: 速率控制中断提示: 按CtrlC可中断发送将强制终止所有进程发送完成统计信息:预计发送: 100000 个报文总耗时: 7.427018 秒预计速率: 40000.00 pps预计总时间: 2.50 秒进程数: 4[rootlocalhost ~]# python udp_sender.py -husage: udp_sender.py [-h] [--count COUNT] [--src-ip SRC_IP][--dst-ip DST_IP] [--src-port SRC_PORT][--dst-port DST_PORT] [--pps PPS] [--ttl TTL][--threads THREADS] [--debug]optional arguments:-h, --help show this help message and exit--count COUNT, -c COUNT要发送的数据包数量 (默认: 10000)--src-ip SRC_IP, -s SRC_IP源IP地址基础值 (默认: 1.1.1.1)--dst-ip DST_IP, -d DST_IP目的IP地址基础值 (默认: 2.1.1.1)--src-port SRC_PORT, -sp SRC_PORT源端口 (默认: 6666)--dst-port DST_PORT, -dp DST_PORT目的端口 (默认: 7777)--pps PPS, -p PPS 每秒发送报文数(pps) (默认:10.0设为0则尽力快速发送)--ttl TTL, -t TTL IP报文TTL值 (默认: 64)--threads THREADS 发送进程数 (默认: 1建议1-4)--debug 启用调试输出显示每个包的发送信息发送端#!/usr/bin/python2 # coding:utf-8 import socket import struct import sys import time import argparse import random import os import multiprocessing import signal import sys class HighSpeedUDPSender: 高速UDP数据包发送器 - 使用进程替代线程 def __init__(self, packet_count10000, src_port6666, dst_port7777, base_src_ip1.1.1.1, base_dst_ip2.1.1.1, pps10, ttl64, debugFalse, threads1): self.packet_count packet_count self.src_port src_port self.dst_port dst_port self.base_src_ip base_src_ip self.base_dst_ip base_dst_ip self.pps pps # 总每秒发送报文数 self.ttl ttl self.debug debug self.processes threads # 使用进程数 # 计算每个进程的pps if self.pps 0: self.process_pps 0 else: self.process_pps self.pps / float(self.processes) # 预计算IP地址整数形式 self.base_src_ip_int self.ip_to_int(base_src_ip) self.base_dst_ip_int self.ip_to_int(base_dst_ip) # 预生成随机IP标识符 self.ip_ids [random.randint(0, 65535) for _ in range(min(packet_count, 65536))] # 存储进程引用 self.process_list [] # 中断标志 self.interrupted False def signal_handler(self, signum, frame): 信号处理函数 - 立即终止所有子进程并退出 if not self.interrupted: print(\n接收到中断信号正在终止所有进程...) self.interrupted True # 终止所有子进程 for p in self.process_list: try: p.terminate() # 强制终止进程 except: pass # 等待子进程结束 for p in self.process_list: try: p.join(timeout0.5) # 等待0.5秒 except: pass print(所有进程已终止程序退出) sys.exit(1) # 退出程序 def send_packets_for_process(self, process_id, start_idx, end_idx): 进程发送函数 - 专注于发送不检查中断 try: # 创建套接字 sock socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_RAW) sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 1024 * 1024) sock.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1) # 计算发送间隔 if self.process_pps 0: interval_per_packet 0 else: interval_per_packet 1.0 / self.process_pps if interval_per_packet 0.1: # 最大等待间隔为0.1秒 interval_per_packet 0.1 # 发送数据包 for i in range(start_idx, end_idx): # 计算递增的IP地址 src_ip_int self.base_src_ip_int i dst_ip_int self.base_dst_ip_int i seq_num i # 使用循环的IP标识符 ip_id self.ip_ids[i % len(self.ip_ids)] # 构建数据包 packet self.build_custom_packet(src_ip_int, dst_ip_int, seq_num, ip_id) dst_ip self.int_to_ip(dst_ip_int) # 发送数据包 try: sock.sendto(packet, (dst_ip, self.dst_port)) # 调试输出 if self.debug and i % 1000 0: src_ip self.int_to_ip(src_ip_int) print(进程%d发送报文 %d: 源 %s:%s - 目的 %s:%s % (process_id, i1, src_ip, self.src_port, dst_ip, self.dst_port)) except Exception as e: if self.debug: src_ip self.int_to_ip(src_ip_int) print(进程%d发送失败[%d]: 源 %s:%s - 目的 %s:%s, 错误: %s % (process_id, i1, src_ip, self.src_port, dst_ip, self.dst_port, str(e))) # 控制发送速率 if interval_per_packet 0 and i end_idx - 1: time.sleep(interval_per_packet) # 关闭套接字 sock.close() except Exception as e: if self.debug: print(进程%d发生错误: %s % (process_id, str(e))) def start_sending(self): 开始发送数据包 if os.getuid() ! 0: print(错误需要root权限运行此脚本) print(请使用sudo运行: sudo python %s [参数] % sys.argv[0]) return # 设置信号处理器 signal.signal(signal.SIGINT, self.signal_handler) signal.signal(signal.SIGTERM, self.signal_handler) print(启动高速UDP发送端...) print( 发送数量: %d % self.packet_count) print( 源地址: %s:%s - 目的地址: %s:%s % (self.base_src_ip, self.src_port, self.base_dst_ip, self.dst_port)) print( 预计速率: %.2f pps % self.pps) print( 每个进程速率: %.2f pps % self.process_pps) print( TTL: %d % self.ttl) print( 进程数: %d % self.processes) print( 调试模式: %s % (开启 if self.debug else 关闭)) print( 发送模式: %s % (尽力快速发送 if self.pps 0 else 速率控制)) print( 中断提示: 按CtrlC可中断发送将强制终止所有进程) print() start_time time.time() try: if self.processes 1: # 多进程发送 packets_per_process self.packet_count // self.processes for i in range(self.processes): start_idx i * packets_per_process end_idx (i 1) * packets_per_process if i self.processes - 1 else self.packet_count p multiprocessing.Process( targetself.send_packets_for_process, args(i, start_idx, end_idx) ) self.process_list.append(p) p.start() # 等待所有进程完成 for p in self.process_list: p.join() else: # 单进程发送 self.send_packets_for_process(0, 0, self.packet_count) except KeyboardInterrupt: # 正常情况下不会执行到这里因为信号处理器会处理中断 print(\n收到中断信号正在停止发送...) except Exception as e: print(\n发送过程中发生错误: {}.format(str(e))) end_time time.time() # 打印统计信息 total_time end_time - start_time print(\n *60) print(发送完成) print(*60) print(统计信息:) print( 预计发送: %d 个报文 % self.packet_count) print( 总耗时: %.6f 秒 % total_time) if self.pps 0: print( 预计速率: %.2f pps % self.pps) print( 预计总时间: %.2f 秒 % (self.packet_count / self.pps if self.pps 0 else 0)) else: print( 预计速率: 尽力快速发送) print( 进程数: %d % self.processes) print(*60) def build_custom_packet(self, src_ip_int, dst_ip_int, seq_num, ip_id): 构建自定义IP/UDP报文 # IP头字段 ip_ver 4 ip_ihl 5 ip_tos 0 ip_tot_len 0 ip_frag_off 0 ip_ttl self.ttl ip_proto socket.IPPROTO_UDP ip_check 0 # 确保IP标识符在有效范围内 ip_id ip_id 0xFFFF # UDP头字段 data (Message %07d % (seq_num 1)).encode() # 计算UDP长度 udp_length 8 len(data) if udp_length 65535: max_data_len 65535 - 8 data data[:max_data_len] udp_length 8 len(data) udp_src_port self.src_port udp_dst_port self.dst_port udp_checksum 0 # 确保端口在有效范围内 udp_src_port udp_src_port 0xFFFF udp_dst_port udp_dst_port 0xFFFF udp_length udp_length 0xFFFF # 构建UDP头部和数据 udp_header struct.pack(!HHHH, udp_src_port, udp_dst_port, udp_length, udp_checksum) udp_packet udp_header data # 计算IP包总长度 ip_tot_len 20 len(udp_packet) ip_tot_len ip_tot_len 0xFFFF # 构建IP头部 ip_ver_ihl (ip_ver 4) ip_ihl ip_header struct.pack(!BBHHHBBH4s4s, ip_ver_ihl, ip_tos, ip_tot_len, ip_id, ip_frag_off, ip_ttl, ip_proto, ip_check, self.int_to_bytes(src_ip_int, 4, big), self.int_to_bytes(dst_ip_int, 4, big)) return ip_header udp_packet def ip_to_int(self, ip_str): 将IP地址字符串转换为整数 parts ip_str.split(.) return (int(parts[0]) 24) (int(parts[1]) 16) (int(parts[2]) 8) int(parts[3]) def int_to_ip(self, ip_int): 将整数转换为IP地址字符串 return ..join(map(str, [ (ip_int 24) 0xFF, (ip_int 16) 0xFF, (ip_int 8) 0xFF, ip_int 0xFF ])) def int_to_bytes(self, n, length, byteorderbig): 将整数转换为字节串 if sys.version_info[0] 2: return n.to_bytes(length, byteorder) else: bytes_list [] for _ in range(length): bytes_list.append(chr(n 0xFF)) n 8 if byteorder big: bytes_list.reverse() return .join(bytes_list) def parse_args(): 解析命令行参数 parser argparse.ArgumentParser( description高速UDP数据包发送器 - 发送递增IP地址的UDP报文, formatter_classargparse.RawDescriptionHelpFormatter, epilog 使用示例: sudo python %(prog)s --count 1000 sudo python %(prog)s --count 1000000 --pps 1000 sudo python %(prog)s --count 1000000 --pps 0 --threads 4 sudo python %(prog)s --count 5000 --src-ip 10.0.0.1 --dst-ip 10.0.0.2 sudo python %(prog)s --count 10000 --pps 100 --ttl 128 sudo python %(prog)s --count 100 --debug ) parser.add_argument(--count, -c, typeint, default10000, help要发送的数据包数量 (默认: 10000)) parser.add_argument(--src-ip, -s, default1.1.1.1, help源IP地址基础值 (默认: 1.1.1.1)) parser.add_argument(--dst-ip, -d, default2.1.1.1, help目的IP地址基础值 (默认: 2.1.1.1)) parser.add_argument(--src-port, -sp, typeint, default6666, help源端口 (默认: 6666)) parser.add_argument(--dst-port, -dp, typeint, default7777, help目的端口 (默认: 7777)) parser.add_argument(--pps, -p, typefloat, default10.0, help每秒发送报文数(pps) (默认: 10.0设为0则尽力快速发送)) parser.add_argument(--ttl, -t, typeint, default64, helpIP报文TTL值 (默认: 64)) parser.add_argument(--threads, typeint, default1, help发送进程数 (默认: 1建议1-4)) parser.add_argument(--debug, actionstore_true, help启用调试输出显示每个包的发送信息) return parser.parse_args() if __name__ __main__: args parse_args() sender HighSpeedUDPSender( packet_countargs.count, src_portargs.src_port, dst_portargs.dst_port, base_src_ipargs.src_ip, base_dst_ipargs.dst_ip, ppsargs.pps, ttlargs.ttl, debugargs.debug, threadsargs.threads ) sender.start_sending()接收端#!/usr/bin/python # coding:utf-8 import socket import struct import argparse import sys import re import fcntl import os import time import signal class PromiscuousUDPReceiver: def __init__(self, interfaceNone, src_cidrNone, dst_cidrNone, src_portNone, dst_portNone, protocoludp, debugFalse, timeout0): self.interface interface self.src_cidr src_cidr self.dst_cidr dst_cidr self.src_port src_port self.dst_port dst_port self.protocol protocol.lower() self.running True self.packet_count 0 self.parsed_count 0 self.match_count 0 self.send_count 0 self.debug debug self.timeout timeout self.last_packet_time time.time() # 记录最后收到数据包的时间 def ip_in_cidr(self, ip, cidr): 检查IP地址是否在CIDR范围内 if not cidr: return True try: if / not in cidr: return ip cidr network_str, mask_bits cidr.split(/) mask_bits int(mask_bits) def ip_to_int(ip_str): parts ip_str.split(.) return (int(parts[0]) 24) (int(parts[1]) 16) (int(parts[2]) 8) int(parts[3]) ip_int ip_to_int(ip) network_int ip_to_int(network_str) mask (0xFFFFFFFF (32 - mask_bits)) 0xFFFFFFFF return (ip_int mask) (network_int mask) except Exception as e: self.safe_print(CIDR匹配错误: %s, repr(e), debugTrue) return False def should_process_packet(self, src_ip, dst_ip, src_port, dst_port, protocol): 根据过滤条件判断是否处理该数据包 if self.protocol and self.protocol ! protocol.lower(): return False if self.src_port and int(self.src_port) ! src_port: return False if self.dst_port and int(self.dst_port) ! dst_port: return False if self.src_cidr and not self.ip_in_cidr(src_ip, self.src_cidr): return False if self.dst_cidr and not self.ip_in_cidr(dst_ip, self.dst_cidr): return False return True def set_promiscuous_mode(self, sock, interface): 设置网络接口为混杂模式 if not interface: return True try: SIOCGIFFLAGS 0x8913 SIOCSIFFLAGS 0x8914 IFF_PROMISC 0x100 ifr struct.pack(16sH, interface[:15], 0) result fcntl.ioctl(sock.fileno(), SIOCGIFFLAGS, ifr) flags struct.unpack(H, result[16:18])[0] flags | IFF_PROMISC new_ifr struct.pack(16sH, interface[:15], flags) fcntl.ioctl(sock.fileno(), SIOCSIFFLAGS, new_ifr) self.safe_print(接口 %s 已设置为混杂模式, interface) return True except Exception as e: self.safe_print(设置混杂模式失败: %s, repr(e), debugTrue) return False def create_packet_socket(self): 创建PF_PACKET原始套接字用于接收所有流量 try: sock socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.ntohs(0x0003)) return sock except Exception as e: self.safe_print(创建PF_PACKET套接字失败: %s, repr(e), debugTrue) return None def parse_packet(self, packet): 解析数据包返回源IP、目的IP、源端口、目的端口、协议、载荷 try: if len(packet) 14: return None eth_header packet[:14] eth struct.unpack(!6s6sH, eth_header) eth_type eth[2] offset 14 actual_eth_type eth_type if actual_eth_type 0x8100: if len(packet) 18: vlan_inner_type struct.unpack(!H, packet[16:18])[0] actual_eth_type vlan_inner_type offset 18 else: return None if actual_eth_type ! 0x0800: return None if len(packet) offset 20: return None ip_header packet[offset:offset20] iph struct.unpack(!BBHHHBBH4s4s, ip_header) version_ihl iph[0] ihl version_ihl 0xF iph_length ihl * 4 if ihl 5 or len(packet) offset iph_length: return None protocol iph[6] src_ip socket.inet_ntoa(iph[8]) dst_ip socket.inet_ntoa(iph[9]) transport_start offset iph_length if protocol 17: if len(packet) transport_start 8: return None udp_header packet[transport_start:transport_start8] udph struct.unpack(!HHHH, udp_header) src_port udph[0] dst_port udph[1] udp_length udph[2] payload_start transport_start 8 payload_length udp_length - 8 if len(packet) payload_start payload_length: return None payload packet[payload_start:payload_start payload_length] return (src_ip, dst_ip, src_port, dst_port, udp, payload) elif protocol 6: if len(packet) transport_start 20: return None tcp_header packet[transport_start:transport_start20] tcph struct.unpack(!HHLLBBHHH, tcp_header) src_port tcph[0] dst_port tcph[1] tcp_header_length (tcph[4] 4) * 4 payload_start transport_start tcp_header_length if len(packet) payload_start: return None payload packet[payload_start:] return (src_ip, dst_ip, src_port, dst_port, tcp, payload) except Exception as e: self.safe_print(解析数据包错误: %s, repr(e), debugTrue) return None def send_acknowledgment(self, src_ip, dst_ip, src_port, dst_port, protocol, original_data): 使用原始套接字发送确认消息交换源和目的地址/端口 try: raw_sock socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_RAW) raw_sock.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1) reply_src_ip dst_ip reply_dst_ip src_ip reply_src_port dst_port reply_dst_port src_port ack_data ACK from %s:%d % (reply_src_ip, reply_src_port) packet self.build_custom_udp_packet( src_ipreply_src_ip, dst_ipreply_dst_ip, src_portreply_src_port, dst_portreply_dst_port, dataack_data ) raw_sock.sendto(packet, (reply_dst_ip, 0)) self.send_count 1 # 调试模式下显示发送信息 if self.debug: self.safe_print( [原始套接字] 发送确认 [#%d]: %s:%s - %s:%s | 数据: %s, self.send_count, reply_src_ip, reply_src_port, reply_dst_ip, reply_dst_port, ack_data) # 调试模式下显示监控信息 if self.debug and self.send_count % 100 0: self.safe_print([%d] 监控中... 最新: %s:%s - %s:%s [UDP] | 已发送: %d 个确认, self.send_count, reply_src_ip, reply_src_port, reply_dst_ip, reply_dst_port, self.send_count) raw_sock.close() except Exception as e: self.safe_print(发送原始套接字确认失败: %s, repr(e), debugTrue) def build_custom_udp_packet(self, src_ip, dst_ip, src_port, dst_port, data): 构建完整的自定义IP/UDP数据包 if isinstance(data, unicode): data data.encode(utf-8) udp_header_length 8 udp_total_length udp_header_length len(data) udp_checksum 0 udp_header struct.pack(!HHHH, src_port, dst_port, udp_total_length, udp_checksum) ip_version_ihl (4 4) | 5 ip_tos 0 ip_total_length 20 udp_total_length import random ip_identification random.randint(0, 65535) ip_flags_fragment 0 ip_ttl 128 ip_protocol socket.IPPROTO_UDP ip_header_checksum 0 src_ip_bytes socket.inet_aton(src_ip) dst_ip_bytes socket.inet_aton(dst_ip) ip_header_without_checksum struct.pack(!BBHHHBBH, ip_version_ihl, ip_tos, ip_total_length, ip_identification, ip_flags_fragment, ip_ttl, ip_protocol, ip_header_checksum) ip_header ip_header_without_checksum src_ip_bytes dst_ip_bytes ip_header self.calculate_ip_checksum(ip_header) return ip_header udp_header data def calculate_ip_checksum(self, ip_header): 计算IP头部校验和 header_bytes bytearray(ip_header) header_bytes[10] 0 header_bytes[11] 0 checksum 0 for i in range(0, len(header_bytes), 2): if i 1 len(header_bytes): word (header_bytes[i] 8) header_bytes[i 1] else: word (header_bytes[i] 8) checksum word checksum (checksum 0xFFFF) (checksum 16) checksum ~checksum 0xFFFF header_bytes[10] checksum 8 header_bytes[11] checksum 0xFF return bytes(header_bytes) def safe_decode_payload(self, payload): 安全解码载荷数据 if not payload: return [无数据] try: decoded payload.decode(utf-8, errorsignore) if decoded and decoded.strip(): return decoded else: return [二进制数据长度: %d 字节] % len(payload) except: try: hex_str payload.encode(hex) if len(hex_str) 100: hex_str hex_str[:100] ... return [十六进制: %s] % hex_str except: return [二进制数据长度: %d 字节] % len(payload) def safe_print(self, format_str, *args, **kwargs): 安全的打印函数 debug_mode kwargs.get(debug, False) if debug_mode and not self.debug: return try: import re pattern r%[0-9]*\.?[0-9]*[sdifFeEgGxXocrb] safe_format re.sub(pattern, %s, format_str) safe_args [] for arg in args: if isinstance(arg, (int, long)): safe_args.append(str(arg)) elif isinstance(arg, float): safe_args.append(%.6f % arg) elif isinstance(arg, unicode): safe_args.append(arg.encode(utf-8, ignore)) elif isinstance(arg, str): try: safe_args.append(arg.decode(utf-8, ignore).encode(utf-8, ignore)) except: safe_args.append(repr(arg)[:50]) else: safe_args.append(str(arg)) result safe_format % tuple(safe_args) print(result) except Exception as e: try: print(打印错误: 格式字符串: %s, 错误: %s % (repr(format_str)[:100], repr(e))) except: print(打印错误: 无法输出错误信息) def signal_handler(self, signum, frame): 信号处理函数用于处理用户中断 self.safe_print(\n接收到中断信号正在停止接收...) self.running False def start_receiver(self): 启动混杂模式数据包接收 if os.getuid() ! 0: print(错误需要root权限运行此脚本) print(请使用sudo运行: sudo python %s [参数] % sys.argv[0]) return sock self.create_packet_socket() if not sock: return try: if self.interface: try: sock.bind((self.interface, 0)) self.safe_print(成功绑定到接口: %s, self.interface) except Exception as e: self.safe_print(绑定到接口 %s 失败: %s, self.interface, repr(e), debugTrue) self.safe_print(尝试绑定到所有接口...) sock.bind((any, 0)) else: sock.bind((any, 0)) self.safe_print(绑定到所有接口) if self.interface: self.set_promiscuous_mode(sock, self.interface) # 设置信号处理器用于捕获用户中断 signal.signal(signal.SIGINT, self.signal_handler) # 如果设置了timeout0设置socket超时 if self.timeout 0: sock.settimeout(self.timeout) self.safe_print(套接字超时设置为: %d 秒 % self.timeout) else: self.safe_print(套接字不设置超时等待用户中断信号...) self.safe_print(混杂模式数据包接收器启动) self.safe_print( 监听接口: %s, (self.interface or 所有接口)) self.safe_print( 协议过滤: %s, self.protocol.upper()) self.safe_print( 源地址过滤: %s, (self.src_cidr or 所有地址)) self.safe_print( 目的地址过滤: %s, (self.dst_cidr or 所有地址)) self.safe_print( 源端口过滤: %s, (self.src_port or 所有端口)) self.safe_print( 目的端口过滤: %s, (self.dst_port or 所有端口)) self.safe_print( 超时设置: %s % (%d 秒 % self.timeout if self.timeout 0 else 无超时)) self.safe_print(等待数据包...) start_time time.time() while self.running: try: # 接收原始数据包 packet, addr sock.recvfrom(65535) self.packet_count 1 self.last_packet_time time.time() # 更新最后收到数据包的时间 result self.parse_packet(packet) if not result: continue # 成功解析了一个包 self.parsed_count 1 src_ip, dst_ip, src_port, dst_port, protocol, payload result if self.should_process_packet(src_ip, dst_ip, src_port, dst_port, protocol): self.match_count 1 # 调试模式下显示匹配数据包信息 if self.debug: decoded_data self.safe_decode_payload(payload) self.safe_print([%d] 匹配数据包: %s:%s - %s:%s [%s] | 载荷: %s, self.match_count, src_ip, src_port, dst_ip, dst_port, protocol.upper(), decoded_data[:100]) # 调试模式下显示接收统计 if self.debug and self.match_count % 100 0: decoded_data self.safe_decode_payload(payload) self.safe_print([%s] 接收统计... 最新接收: %s:%s - %s:%s [%s] | 已接收匹配包: %s, 已发送确认: %s, self.match_count, src_ip, src_port, dst_ip, dst_port, protocol.upper(), self.match_count, self.send_count) self.send_acknowledgment(src_ip, dst_ip, src_port, dst_port, protocol, payload) except socket.timeout: # 套接字超时退出循环 self.safe_print(\n套接字超时%d 秒内未收到数据包正在停止接收... % self.timeout) self.running False break except KeyboardInterrupt: self.safe_print(\n用户中断停止接收...) break except Exception as e: self.safe_print([%s] 处理错误: %s, self.packet_count, repr(e), debugTrue) except Exception as e: self.safe_print(主循环错误: %s, repr(e), debugTrue) finally: sock.close() end_time time.time() total_time end_time - start_time self.safe_print(套接字已关闭) self.safe_print(运行时长: %.2f 秒 % total_time) self.safe_print(统计信息: 总共接收原始包 %d 个, 成功解析 %d 个, 匹配 %d 个请求包, 发送 %d 个确认报文 % (self.packet_count, self.parsed_count, self.match_count, self.send_count)) def main(): parser argparse.ArgumentParser(description混杂模式UDP数据包接收器) parser.add_argument(--interface, -i, defaultens192, help网络接口名称) parser.add_argument(--src-cidr, -s, help源地址CIDR过滤) parser.add_argument(--dst-cidr, -d, help目的地址CIDR过滤) parser.add_argument(--src-port, -sp, default6666, help源端口过滤) parser.add_argument(--dst-port, -dp, default7777, help目的端口过滤) parser.add_argument(--protocol, -p, defaultudp, choices[udp, tcp], help协议过滤) parser.add_argument(--debug, actionstore_true, help启用调试输出显示匹配数据包、发送确认和监控信息) parser.add_argument(--timeout, -t, typeint, default0, help套接字超时时间(秒)。0表示不设置超时通过CtrlC中断大于0表示在指定秒数内无数据包则自动退出) args parser.parse_args() receiver PromiscuousUDPReceiver( interfaceargs.interface, src_cidrargs.src_cidr, dst_cidrargs.dst_cidr, src_portargs.src_port, dst_portargs.dst_port, protocolargs.protocol, debugargs.debug, timeoutargs.timeout ) receiver.start_receiver() if __name__ __main__: main()