import ptf import random import ipaddress from ptf.testutils import * from ptf.thriftutils import * from scapy.all import * from scapy.contrib.bfd import BFD def simple_bfd_packet( eth_dst="10:70:fd:03:c0:bd", eth_src="0a:0a:0a:0a:01:28", ip_src="10.10.1.40", ip_dst="10.254.60.1", ip_ttl=255, udp_sport=3784, udp_dport=3784, sta=0, my_discriminator=1, your_discriminator=2, ): """ Return a simple BFD packet """ bfd = BFD(sta=sta, my_discriminator=my_discriminator, your_discriminator=your_discriminator) pkt = simple_udp_packet( eth_dst=eth_dst, eth_src=eth_src, ip_src=ip_src, ip_dst=ip_dst, ip_ttl=ip_ttl, udp_sport=udp_sport, udp_dport=udp_dport, udp_payload=bfd ) return pkt def simple_vlan_udp_packet(eth_dst, eth_src, vlan_vid, ip_src="192.168.0.1", ip_dst="192.168.0.2", udp_sport=1234, udp_dport=80, udp_payload = ""): # 构建Ethernet帧 ether_frame = Ether(src=eth_src, dst=eth_dst) # 添加VLAN标签 vlan_frame = ether_frame / Dot1Q(vlan=vlan_vid) # 添加IP头 ip_packet = IP(src=ip_src, dst=ip_dst) # 添加UDP头 udp_packet = UDP(sport=udp_sport, dport=udp_dport) # 添加有效负载 udp_payload_packet = udp_packet / udp_payload # 构建最终的数据包 final_packet = vlan_frame / ip_packet / udp_payload_packet return final_packet def simple_qinq_udp_packet(eth_src, eth_dst, outer_vlan_id, inner_vlan_id, ip_src="192.168.0.1", ip_dst="192.168.0.2", udp_sport=1234,udp_dport=80, udp_payload = ""): # 构建Ethernet帧 ether_frame = Ether(src=eth_src, dst=eth_dst) # 添加QinQ标签 qinq_frame = ether_frame / Dot1Q(vlan=outer_vlan_id) / Dot1Q(vlan=inner_vlan_id) # 添加IP头 ip_packet = IP(src=ip_src, dst=ip_dst) # 添加UDP头 udp_packet = UDP(sport=udp_sport, dport=udp_dport) # 添加有效负载 udp_payload_packet = udp_packet / udp_payload # 构建最终的数据包 final_packet = qinq_frame / ip_packet / udp_payload_packet return final_packet def generate_random_tcp_port(): """ 生成随机的 TCP 端口号 """ return random.randint(1024, 65535) def generate_random_ipv4(): """ 生成随机的 IPv4 地址 """ # 随机生成四个 0-255 之间的数字 octets = [str(random.randint(0, 255)) for _ in range(4)] # 将四个数字组合成一个 IPv4 地址 ip_addr = ".".join(octets) # 将字符串类型的 IP 地址转换为 IPv4Address 对象 return ipaddress.IPv4Address(ip_addr) def get_port_packet_count(self, port): return self.dataplane.rx_counters[port]