import ptf from ptf.base_tests import BaseTest from ptf.testutils import * from scapy.all import * from mrzcpd import Mrzcpd from common_pkt import * import ptf.mask as mask # Here we construct the test network # virtual-vwire: bond-m <----> veth4 # bond-m: veth0,veth1,veth2,veth3 bond_conf_template = """ [device] device=bond_m,veth0,veth1,veth2,veth3,veth4 sz_tunnel=8192 sz_buffer=0 [device:bond_m] promisc=1 role=1 type=1 bond_slaves=veth0,veth1,veth2,veth3 bond_mode=%u bond_xmit_policy=%u [device:veth0] driver=2 promisc=1 [device:veth1] driver=2 promisc=1 [device:veth2] driver=2 promisc=1 [device:veth3] driver=2 promisc=1 [device:veth4] promisc=1 driver=2 role=1 [service] iocore=1 distmode=2 hashmode=0 [limits] nr_max_ef_adapters=32 nr_max_vwires=32 nr_max_tera_adapters=32 nr_max_link_dbs=32 [eal] virtaddr=0x600000000000 loglevel=8 nohuge=1 mem=65535 [keepalive] check_spinlock=1 [http_server] listen_addr=127.0.0.1 [ctrlzone] ctrlzone0=tunnat,64 ctrlzone1=vsys,64 [pool] create_mode=3 sz_direct_pktmbuf=4096 sz_indirect_pktmbuf=4096 sz_cache=256 sz_data=3000 [ctrlmsg] listen_addr=0.0.0.0 listen_port=46789 [rpc] addr=127.0.0.1 port=56789 # sid [ef_adapters] sid_start=100 sid_end=200 max_rules=256 [vwires] sid_start=300 sid_end=400 max_rules=256 [service_lb] sid_start=1000 sid_end=2000 [vwire:0] vwire_id=0 interface_int=bond_m interface_ext=veth4 """ # consts BONDING_MODE_ROUND_ROBIN = 0 BONDING_MODE_ACTIVE_BACKUP = 1 BONDING_MODE_BALANCE = 2 BONDING_MODE_BROADCAST = 3 BONDING_MODE_8023AD = 4 BONDING_XMIT_POLICY_LAYER2 = 0 BONDING_XMIT_POLICY_LAYER23 = 1 BONDING_XMIT_POLICY_LAYER34 = 2 @group("bond_balance_test") class TestBondBalance(BaseTest): def setUp(self): self.dataplane = ptf.dataplane_instance def __init__(self): BaseTest.__init__(self) def test_bond_to_eth(self): # create a simple tcp packet with any src/dst mac address, any ip address pkt = simple_tcp_packet(eth_dst="10:70:fd:03:c0:bd", eth_src="0a:0a:0a:0a:01:28", ip_src="10.10.10.10", ip_dst="10.10.10.20") # send this packet from bond members for bond_member_id in range(4): send_packet(self, bond_member_id, pkt) verify_packet(self, pkt, 4) def test_eth_to_bond(self): # create a simple tcp packet with any src/dst mac address, any ip address pkt = simple_tcp_packet(eth_dst="0a:0a:0a:0a:01:28", eth_src="10:70:fd:03:c0:bd", ip_src="10.10.10.10", ip_dst="10.10.10.20") # send this packet from veth4 send_packet(self, 4, pkt) # verify the packet from any member of bond verify_packet_any_port(self, pkt, [0, 1, 2, 3]) def runTest(self): try: # Init & Start mrzcpd mrzcpd_runner = Mrzcpd(bond_conf_template % (BONDING_MODE_BALANCE, BONDING_XMIT_POLICY_LAYER34),"") mrzcpd_runner.start() self.test_bond_to_eth() self.test_eth_to_bond() finally: mrzcpd_runner.stop() @group("bond_lacp_test") class TestBondLACP(BaseTest): def setUp(self): self.dataplane = ptf.dataplane_instance def __init__(self): BaseTest.__init__(self) def test_bond_to_eth(self): # create a simple tcp packet with any src/dst mac address, any ip address pkt = simple_tcp_packet(eth_dst="10:70:fd:03:c0:bd", eth_src="0a:0a:0a:0a:01:28", ip_src="10.10.10.10", ip_dst="10.10.10.20") # send this packet from bond members for bond_member_id in range(4): send_packet(self, bond_member_id, pkt) verify_packet(self, pkt, 4) def test_eth_to_bond(self): # create a simple tcp packet with any src/dst mac address, any ip address pkt = simple_tcp_packet(eth_dst="0a:0a:0a:0a:01:28", eth_src="10:70:fd:03:c0:bd", ip_src="10.10.10.10", ip_dst="10.10.10.20") # send this packet from veth4 send_packet(self, 4, pkt) # verify the packet from any member of bond verify_packet_any_port(self, pkt, [0, 1, 2, 3]) def expect_lacp_packets(self): # build the LACP packets using scapy slow_protocol_pkt = Ether(eth_dst="01:80:c2:00:00:02", eth_src="10:10:10:10:10:10", eth_type=0x8809) / \ Raw("\x01\x01\x01\x01\x01\x01\x01\x01\x00\x00\x00\x00\x00\x00\x00\x00") # only care about the eth_type slow_protocol_pkt = mask.Mask(slow_protocol_pkt) slow_protocol_pkt.set_do_not_care_packet('Ethernet', 'src') slow_protocol_pkt.set_do_not_care_packet('Ethernet', 'dst') slow_protocol_pkt.set_do_not_care(14, 16) # wait for the slow protocol pkt verify_packet_any_port(self, slow_protocol_pkt, [0, 1, 2, 3], timeout=120) def runTest(self): try: # Init & Start mrzcpd mrzcpd_runner = Mrzcpd(bond_conf_template % (BONDING_MODE_8023AD, BONDING_XMIT_POLICY_LAYER34),"") mrzcpd_runner.start() #self.test_bond_to_eth() #self.test_eth_to_bond() self.expect_lacp_packets() finally: mrzcpd_runner.stop()