summaryrefslogtreecommitdiff
path: root/ptf/test0418.py
diff options
context:
space:
mode:
Diffstat (limited to 'ptf/test0418.py')
-rw-r--r--ptf/test0418.py852
1 files changed, 0 insertions, 852 deletions
diff --git a/ptf/test0418.py b/ptf/test0418.py
deleted file mode 100644
index 15539ca..0000000
--- a/ptf/test0418.py
+++ /dev/null
@@ -1,852 +0,0 @@
-
-"""
-Simple PTF test for super_vlan
-"""
-
-######### STANDARD MODULE IMPORTS ########
-import unittest
-import logging
-import grpc
-import pdb
-
-######### PTF modules for BFRuntime Client Library APIs #######
-import ptf
-from ptf.testutils import *
-from bfruntime_client_base_tests import BfRuntimeTest
-import bfrt_grpc.bfruntime_pb2 as bfruntime_pb2
-import bfrt_grpc.client as gc
-
-########## Basic Initialization ############
-class BaseProgramTest(BfRuntimeTest):
- # The setUp() method is used to prepare the test fixture. Typically
- # you would use it to establich connection to the gRPC Server
- #
- # You can also put the initial device configuration there. However,
- # if during this process an error is encountered, it will be considered
- # as a test error (meaning the test is incorrect),
- # rather than a test failure
- #
- # Here is the stuff we set up that is ready to use
- # client_id
- # p4_name
- # bfrt_info
- # dev
- # dev_tgt
- # allports
- # tables -- the list of tables
- # Individual tables of the program with short names
- # ipv4_host
- # ipv4_lpm
- # nexthop
- def setUp(self):
- self.client_id = 0
- self.p4_name = "super_vlan"
- self.dev = 0
- self.dev_tgt = gc.Target(self.dev, pipe_id=0xFFFF)
-
- print("\n")
- print("Test Setup")
- print("==========")
-
- BfRuntimeTest.setUp(self, self.client_id, self.p4_name)
- self.bfrt_info = self.interface.bfrt_info_get(self.p4_name)
-
- print(" Connected to Device: {}, Program: {}, ClientId: {}".format(
- self.dev, self.p4_name, self.client_id))
-
- # Since this class is not a test per se, we can use the setup method
- # for common setup. For example, we can have our tables and annotations
- # ready
- self.is_l2_switch = self.bfrt_info.table_get("Ingress.is_l2_switch")
- self.is_l2_switch.info.key_field_annotation_add(
- "hdr.ethernet.dst_addr", "mac")
-
- self.Trunk_switch = self.bfrt_info.table_get("Ingress.Trunk_switch")
- self.Trunk_switch.info.key_field_annotation_add(
- "hdr.ethernet.dst_addr", "mac")
-
- self.Port_to_srcvid = self.bfrt_info.table_get("Ingress.Port_to_srcvid")
- self.Port_to_srcvid.info.key_field_annotation_add(
- "ig_intr_md.ingress_port", "int")
-
- self.dstIP_to_dstvid = self.bfrt_info.table_get("Ingress.dstIP_to_dstvid")
- self.dstIP_to_dstvid.info.key_field_annotation_add(
- "meta.dst_ipv4", "ipv4")
-
- self.src_supervid_to_mac = self.bfrt_info.table_get("Ingress.src_supervid_to_mac")
- self.src_supervid_to_mac.info.data_field_annotation_add(
- "new_mac_switch", "Ingress.set_super_MAC", "mac")
-
- self.ipv4_host = self.bfrt_info.table_get("Ingress.ipv4_host")
- self.ipv4_host.info.key_field_annotation_add(
- "meta.dst_ipv4", "ipv4")
-
- self.ipv4_lpm = self.bfrt_info.table_get("Ingress.ipv4_lpm")
- self.ipv4_lpm.info.key_field_annotation_add(
- "meta.dst_ipv4", "ipv4")
-
- self.nexthop = self.bfrt_info.table_get("Ingress.nexthop")
- self.nexthop.info.data_field_annotation_add(
- "new_mac_da", "Ingress.l3_switch", "mac")
- self.nexthop.info.data_field_annotation_add(
- "new_mac_sa", "Ingress.l3_switch", "mac")
-
- self.egr_vlan_port = self.bfrt_info.table_get("Egress.egr_vlan_port")
- self.egr_vlan_port.info.key_field_annotation_add(
- "meta.vid", "int")
-
-
- self.tables = [ self.is_l2_switch, self.Trunk_switch, self.Port_to_srcvid, self.dstIP_to_dstvid, self.src_supervid_to_mac, self.ipv4_host, self.ipv4_lpm, self.nexthop, self.egr_vlan_port ]
-
- # Create a list of all ports available on the device
- self.swports = []
- for (device, port, ifname) in ptf.config['interfaces']:
- self.swports.append(port)
- self.swports.sort()
-
- # Optional, but highly recommended
- self.cleanUp()
-
- # Use Cleanup Method to clear the tables before and after the test starts
- # (the latter is done as a part of tearDown()
- def cleanUp(self):
- print("\n")
- print("Table Cleanup:")
- print("==============")
-
- try:
- for t in self.tables:
- print(" Clearing Table {}".format(t.info.name_get()))
- keys = []
- for (d, k) in t.entry_get(self.dev_tgt):
- if k is not None:
- keys.append(k)
- t.entry_del(self.dev_tgt, keys)
- # Not all tables support default entry
- try:
- t.default_entry_reset(self.dev_tgt)
- except:
- pass
- except Exception as e:
- print("Error cleaning up: {}".format(e))
-
- # Use tearDown() method to return the DUT to the initial state by cleaning
- # all the configuration and clearing up the connection
- def tearDown(self):
- print("\n")
- print("Test TearDown:")
- print("==============")
-
- self.cleanUp()
-
- # Call the Parent tearDown
- BfRuntimeTest.tearDown(self)
-
-#
-# Individual tests can now be subclassed from BaseProgramTest
-#
-
-############################################################################
-################# I N D I V I D U A L T E S T S #########################
-############################################################################
-
-class HostSend(BaseProgramTest):
- def runTest(self):
- ingress_port = test_param_get("ingress_port", 0)
- ipv4_dst = test_param_get("ipv4_dst", "192.168.1.2")
- dst_addr = test_param_get("dst_addr", "00:00:02:00:00:02")
- egress_port = test_param_get("egress_port", 4)
- nexthop_id = test_param_get("nexthop_id", 100)
- same_flag = test_param_get("same_flag", True)
-
- print("\n")
- print("Test Run")
- print("========")
-
- #
- # Program an entry in nexthop: nexthop_id --> send(egress_port)
- #
- key_list = [
- self.nexthop.make_key([
- gc.KeyTuple('nexthop_id', nexthop_id)])
- ]
-
- data_list = [
- self.nexthop.make_data([], "Ingress.send")
- ]
-
- self.nexthop.entry_add(self.dev_tgt, key_list, data_list);
- print(" Added an entry to nexthop: {} --> send()".format(
- nexthop_id))
-
- #
- # Program an entry in ipv4_host: ipv4_dst --> set_nexthop(nexthop_id)
- #
-
- key_list = [
- self.ipv4_host.make_key([
- gc.KeyTuple('same_flag', same_flag),
- gc.KeyTuple('meta.dst_ipv4', ipv4_dst)])
- ]
-
- data_list = [
- self.ipv4_host.make_data([
- gc.DataTuple('nexthop', nexthop_id)], "Ingress.set_nexthop")
- ]
-
- self.ipv4_host.entry_add(self.dev_tgt, key_list, data_list);
- print(" Added an entry to ipv4_host: {},{} --> set_nexthop({})".
- format(same_flag, ipv4_dst, nexthop_id))
-
- #
- # Program an entry in is_l2_switch: dst_mac --> l2_switch
- #
- key_list = [
- self.is_l2_switch.make_key([
- gc.KeyTuple('hdr.ethernet.dst_addr', dst_addr)])
- ]
-
- data_list = [
- self.is_l2_switch.make_data([
- gc.DataTuple('port', egress_port)], "Ingress.l2_switch")
- ]
-
- self.is_l2_switch.entry_add(self.dev_tgt, key_list, data_list);
- print(" Added an entry to is_l2_switch: {} --> l2_switch({})".
- format(dst_addr, egress_port))
-
- #
- # Send a test packet
- #
- print(" Sending packet with IPv4 DST ADDR={} into port {}"
- .format(ipv4_dst, ingress_port))
- pkt = simple_tcp_packet(eth_dst=dst_addr,
- eth_src='00:00:02:00:00:01',
- ip_dst=ipv4_dst,
- ip_id=101,
- ip_ttl=64,
- ip_ihl=5)
- send_packet(self, ingress_port, pkt)
-
- #
- # Wait for the egress packet and verify it
- #
- print(" Expecting the packet to be forwarded to port {}"
- .format(egress_port))
- verify_packet(self, pkt, egress_port)
- print(" Packet received of port {}".format(egress_port))
-
-
-class HostDrop(BaseProgramTest):
- def runTest(self):
- ingress_port = test_param_get("ingress_port", 0)
- ipv4_dst = test_param_get("ipv4_dst", "192.168.1.3")
- dst_addr = test_param_get("dst_addr", "00:00:02:00:00:03")
- egress_port = test_param_get("egress_port", 5)
- nexthop_id = test_param_get("nexthop_id", 102)
- same_flag = test_param_get("same_flag", True)
-
- print("\n")
- print("Test Run")
- print("========")
-
- #
- # Program an entry in nexthop: nexthop_id --> drop()
- #
- key_list = [
- self.nexthop.make_key([
- gc.KeyTuple('nexthop_id', nexthop_id)])
- ]
-
- data_list = [
- self.nexthop.make_data([], "Ingress.drop")
- ]
-
- self.nexthop.entry_add(self.dev_tgt, key_list, data_list);
- print(" Added an entry to nexthop: {} --> drop()".
- format(nexthop_id))
-
- #
- # Program an entry in ipv4_host: ipv4_dst --> set_nexthop(nexthop_id)
- #
- key_list = [
- self.ipv4_host.make_key([
- gc.KeyTuple('same_flag', same_flag),
- gc.KeyTuple('meta.dst_ipv4', ipv4_dst)])
- ]
-
- data_list = [
- self.ipv4_host.make_data([
- gc.DataTuple('nexthop', nexthop_id)], "Ingress.set_nexthop")
- ]
-
- self.ipv4_host.entry_add(self.dev_tgt, key_list, data_list);
- print(" Added an entry to ipv4_host: {},{} --> set_nexthop({})".
- format(same_flag, ipv4_dst, nexthop_id))
-
- #
- # Program an entry in is_l2_switch: dst_mac --> l2_switch
- #
- key_list = [
- self.is_l2_switch.make_key([
- gc.KeyTuple('hdr.ethernet.dst_addr', dst_addr)])
- ]
-
- data_list = [
- self.is_l2_switch.make_data([
- gc.DataTuple('port', egress_port)], "Ingress.l2_switch")
- ]
-
- self.is_l2_switch.entry_add(self.dev_tgt, key_list, data_list);
- print(" Added an entry to is_l2_switch: {} --> l2_switch({})".
- format(dst_addr, egress_port))
-
-
- #
- # Send a test packet
- #
- print(" Sending packet with IPv4 DST ADDR={} into port {}"
- .format(ipv4_dst, ingress_port))
- pkt = simple_tcp_packet(eth_dst=dst_addr,
- eth_src='00:00:02:00:00:01',
- ip_dst=ipv4_dst,
- ip_id=101,
- ip_ttl=64,
- ip_ihl=5)
- send_packet(self, ingress_port, pkt)
- print(" Expecting No packets anywhere")
-
- #
- # Wait to make sure no packet egresses anywhere. self.swports is the
- # list of all ports the test is using. It is set up in the setUp()
- # method of the parent class
- #
- verify_no_packet_any(self, pkt, self.swports)
- print(" No packets received")
-
-
-class HostL3Switch(BaseProgramTest):
- def runTest(self):
- ingress_port = test_param_get("ingress_port", 0)
- ipv4_dst = test_param_get("ipv4_dst", "192.168.1.13")
- dst_addr = test_param_get("dst_addr", "00:00:00:00:00:01")
- new_mac_da = test_param_get("new_mac_da", "00:00:03:00:00:03")
- new_mac_sa = test_param_get("new_mac_sa", "00:00:00:00:00:01")
- egress_port = test_param_get("egress_port", 4)
- nexthop_id = test_param_get("nexthop_id", 101)
- same_flag = test_param_get("same_flag", False)
- src_supervid = test_param_get("src_supervid", 10)
- switch_mac = test_param_get("switch_mac", "00:00:00:00:00:01")
- dst_subvid = 3
- dst_supervid = 10
- src_subvid = 2
- src_supervid = 10
-
- print("\n")
- print("Test Run")
- print("========")
-
- #
- # Program an entry in nexthop:
- # nexthop_id --> l3_switch(egress_port. new_mac_da, new_mac_da)
- #
- key_list = [
- self.nexthop.make_key([
- gc.KeyTuple('nexthop_id', nexthop_id)])
- ]
-
- data_list = [
- self.nexthop.make_data([gc.DataTuple('port', egress_port),
- gc.DataTuple('new_mac_da', new_mac_da),
- gc.DataTuple('new_mac_sa', new_mac_sa)],
- "Ingress.l3_switch")
- ]
-
- self.nexthop.entry_add(self.dev_tgt, key_list, data_list);
- print(" Added an entry to nexthop: {} --> send({})".
- format(nexthop_id, egress_port))
-
- #
- # Program an entry in IPv4: ipv4_dst --> set_nexthop(nexthop_id)
- #
- key_list = [
- self.ipv4_host.make_key([
- gc.KeyTuple('same_flag', same_flag),
- gc.KeyTuple('meta.dst_ipv4', ipv4_dst)])
- ]
-
- data_list = [
- self.ipv4_host.make_data([
- gc.DataTuple('nexthop', nexthop_id)], "Ingress.set_nexthop")
- ]
-
- self.ipv4_host.entry_add(self.dev_tgt, key_list, data_list);
- print(" Added an entry to ipv4_host: {},{} --> set_nexthop({})".
- format(same_flag, ipv4_dst, nexthop_id))
-
- #
- # Program an entry in src_supervid_to_mac: super_vid --> mac
- #
- key_list = [
- self.src_supervid_to_mac.make_key([
- gc.KeyTuple('src_supervid', src_supervid)])
- ]
-
- data_list = [
- self.src_supervid_to_mac.make_data([
- gc.DataTuple('new_mac_switch', switch_mac)], "Ingress.set_super_MAC")
- ]
-
- self.src_supervid_to_mac.entry_add(self.dev_tgt, key_list, data_list);
- print(" Added an entry to src_supervid_to_mac: VLAN IF{} --> super_mac({})".
- format(src_supervid, switch_mac))
-
- #
- # Program an entry in dstIP_to_dstvid: dst_ip --> dst_vid
- #
- key_list = [
- self.dstIP_to_dstvid.make_key([
- gc.KeyTuple('meta.dst_ipv4', ipv4_dst)])
- ]
-
- data_list = [
- self.dstIP_to_dstvid.make_data([
- gc.DataTuple('subvid', dst_subvid),
- gc.DataTuple('supervid', dst_supervid)], "Ingress.set_dst_vid")
- ]
-
- self.dstIP_to_dstvid.entry_add(self.dev_tgt, key_list, data_list);
- print(" Added an entry to dstIP_to_dstvid: dst_ip{} --> set_dst_vid({},{})".
- format(ipv4_dst, dst_subvid, dst_supervid))
-
- #
- # Program an entry in Port_to_srcvid: dst_ip --> dst_vid
- #
- key_list = [
- self.Port_to_srcvid.make_key([
- gc.KeyTuple('ig_intr_md.ingress_port', ingress_port)])
- ]
-
- data_list = [
- self.Port_to_srcvid.make_data([
- gc.DataTuple('subvid', src_subvid),
- gc.DataTuple('supervid', src_supervid)], "Ingress.set_src_vid")
- ]
-
- self.Port_to_srcvid.entry_add(self.dev_tgt, key_list, data_list);
- print(" Added an entry to Port_to_srcvid: dst_ip{} --> set_src_vid({},{})".
- format(ingress_port, src_subvid, src_supervid))
-
- #
- # Program an entry in is_l2_switch: dst_mac --> l2_switch
- #
- key_list = [
- self.is_l2_switch.make_key([
- gc.KeyTuple('hdr.ethernet.dst_addr', dst_addr)])
- ]
-
- data_list = [
- self.is_l2_switch.make_data([], "Ingress.set_l3_flag")
- ]
-
- self.is_l2_switch.entry_add(self.dev_tgt, key_list, data_list);
- print(" Added an entry to is_l2_switch: {} --> l3_switch()".
- format(dst_addr))
-
- key_list = [
- self.egr_vlan_port.make_key([
- gc.KeyTuple('meta.vid', src_subvid),
- gc.KeyTuple('egress_port', egress_port)])
- ]
- data_list = [
- self.egr_vlan_port.make_data([], "Egress.send_untagged")
- ]
-
- self.egr_vlan_port.entry_add(self.dev_tgt, key_list, data_list);
-
- #
- # Prepare the test packet and the expected packet
- send_pkt = simple_tcp_packet(eth_dst=dst_addr,
- eth_src='00:00:02:00:00:01',
- ip_dst=ipv4_dst,
- ip_id=101,
- ip_ttl=64,
- ip_ihl=5)
- exp_pkt = copy.deepcopy(send_pkt)
- exp_pkt[Ether].dst = new_mac_da
- exp_pkt[Ether].src = new_mac_sa
- exp_pkt[IP].ttl -= 1
-
- #
- # Send a test packet
- #
- print(" Sending packet with IPv4 DST ADDR={} into port {}"
- .format(ipv4_dst, ingress_port))
-
- send_packet(self, ingress_port, send_pkt)
-
- #
- # Wait for the egress packet and verify it
- #
- print(" Expecting the modified packet to be forwarded to port {}"
- .format(egress_port))
-
- verify_packet(self, exp_pkt, egress_port)
- print(" Modified Packet received of port {}"
- .format(egress_port))
-
-
-class TrunkForward(BaseProgramTest):
- def runTest(self):
- ingress_port = test_param_get("ingress_port", 0)
- ipv4_dst = test_param_get("ipv4_dst", "192.168.2.1")
- dst_addr = test_param_get("dst_addr", "01:00:02:00:00:01")
- egress_port = test_param_get("egress_port", 5)
-
- print("\n")
- print("Test Run")
- print("========")
-
- #
- # Program an entry in Trunk_switch: dst_mac --> set_trunk_port
- #
- key_list = [
- self.Trunk_switch.make_key([
- gc.KeyTuple('hdr.ethernet.dst_addr', dst_addr)])
- ]
-
- data_list = [
- self.Trunk_switch.make_data([
- gc.DataTuple('port', egress_port)], "Ingress.set_trunk_port")
- ]
-
- self.Trunk_switch.entry_add(self.dev_tgt, key_list, data_list);
- print(" Added an entry to Trunk_switch: {} --> set_trunk_port({})".
- format(dst_addr, egress_port))
-
- #
- # Send a test packet
- #
- print(" Sending packet with IPv4 DST ADDR={} into port {}"
- .format(ipv4_dst, ingress_port))
- pkt = simple_tcp_packet(eth_dst=dst_addr,
- eth_src='00:00:02:00:00:01',
- ip_dst=ipv4_dst,
- ip_id=101,
- ip_ttl=64,
- ip_ihl=5)
- send_packet(self, ingress_port, pkt)
-
- #
- # Wait for the egress packet and verify it
- #
- print(" Expecting the packet to be forwarded to port {}"
- .format(egress_port))
- verify_packet(self, pkt, egress_port)
- print(" Packet received of port {}".format(egress_port))
-
-class HostArp(BaseProgramTest):
- def runTest(self):
- ingress_port = test_param_get("ingress_port", 0)
- ipv4_dst = test_param_get("ipv4_dst", "192.168.1.13")
- #dst_addr = test_param_get("dst_addr", "00:00:02:00:00:01")
- dst_addr = test_param_get("dst_addr", "FF:FF:FF:FF:FF:FF")
- new_mac_da = test_param_get("new_mac_da", "00:00:03:00:00:03")
- new_mac_sa = test_param_get("new_mac_sa", "00:00:00:00:00:01")
- egress_port = test_param_get("egress_port", 4)
- nexthop_id = test_param_get("nexthop_id", 101)
- same_flag = test_param_get("same_flag", False)
- src_supervid = test_param_get("src_supervid", 10)
- switch_mac = test_param_get("switch_mac", "00:00:00:00:00:01")
- dst_subvid = 3
- dst_supervid = 10
- src_subvid = 2
- src_supervid = 10
- print("\n")
- print("Test Run")
- print("========")
-
- #
- # Program an entry in nexthop:
- # nexthop_id --> l3_switch(egress_port. new_mac_da, new_mac_da)
- #
- key_list = [
- self.nexthop.make_key([
- gc.KeyTuple('nexthop_id', nexthop_id)])
- ]
-
- data_list = [
- self.nexthop.make_data([gc.DataTuple('port', egress_port),
- gc.DataTuple('new_mac_da', new_mac_da),
- gc.DataTuple('new_mac_sa', new_mac_sa)],
- "Ingress.l3_switch")
- ]
-
- self.nexthop.entry_add(self.dev_tgt, key_list, data_list);
- print(" Added an entry to nexthop: {} --> send({})".
- format(nexthop_id, egress_port))
-
- #
- # Program an entry in IPv4: ipv4_dst --> set_nexthop(nexthop_id)
- #
- key_list = [
- self.ipv4_host.make_key([
- gc.KeyTuple('same_flag', same_flag),
- gc.KeyTuple('meta.dst_ipv4', ipv4_dst)])
- ]
-
- data_list = [
- self.ipv4_host.make_data([
- gc.DataTuple('nexthop', nexthop_id)], "Ingress.set_nexthop")
- ]
-
- self.ipv4_host.entry_add(self.dev_tgt, key_list, data_list);
- print(" Added an entry to ipv4_host: {},{} --> set_nexthop({})".
- format(same_flag, ipv4_dst, nexthop_id))
-
- #
- # Program an entry in src_supervid_to_mac: super_vid --> mac
- #
- key_list = [
- self.src_supervid_to_mac.make_key([
- gc.KeyTuple('src_supervid', src_supervid)])
- ]
-
- data_list = [
- self.src_supervid_to_mac.make_data([
- gc.DataTuple('new_mac_switch', switch_mac)], "Ingress.set_super_MAC")
- ]
-
- self.src_supervid_to_mac.entry_add(self.dev_tgt, key_list, data_list);
- print(" Added an entry to src_supervid_to_mac: VLAN IF{} --> super_mac({})".
- format(src_supervid, switch_mac))
-
- #
- # Program an entry in dstIP_to_dstvid: dst_ip --> dst_vid
- #
- key_list = [
- self.dstIP_to_dstvid.make_key([
- gc.KeyTuple('meta.dst_ipv4', ipv4_dst)])
- ]
-
- data_list = [
- self.dstIP_to_dstvid.make_data([
- gc.DataTuple('subvid', dst_subvid),
- gc.DataTuple('supervid', dst_supervid)], "Ingress.set_dst_vid")
- ]
-
- self.dstIP_to_dstvid.entry_add(self.dev_tgt, key_list, data_list);
- print(" Added an entry to dstIP_to_dstvid: dst_ip{} --> set_dst_vid({},{})".
- format(ipv4_dst, dst_subvid, dst_supervid))
-
- #
- # Program an entry in Port_to_srcvid: ingress_port --> src_vid
- #
- key_list = [
- self.Port_to_srcvid.make_key([
- gc.KeyTuple('ig_intr_md.ingress_port', ingress_port)])
- ]
-
- data_list = [
- self.Port_to_srcvid.make_data([
- gc.DataTuple('subvid', src_subvid),
- gc.DataTuple('supervid', src_supervid)], "Ingress.set_src_vid")
- ]
-
- self.Port_to_srcvid.entry_add(self.dev_tgt, key_list, data_list);
- print(" Added an entry to Port_to_srcvid: dst_ip{} --> set_src_vid({},{})".
- format(ingress_port, src_subvid, src_supervid))
-
- #
- # Program an entry in is_l2_switch: dst_mac --> l2_switch
- #
- key_list = [
- self.is_l2_switch.make_key([
- gc.KeyTuple('hdr.ethernet.dst_addr', dst_addr)])
- ]
-
- data_list = [
- self.is_l2_switch.make_data([], "Ingress.set_l3_flag")
- ]
-
- self.is_l2_switch.entry_add(self.dev_tgt, key_list, data_list);
- print(" Added an entry to is_l2_switch: {} --> l3_switch()".
- format(dst_addr))
-
- # Prepare the test packet and the expected packet
-
- pkt = simple_arp_packet(ip_tgt=ipv4_dst, pktlen=100)
- send_packet(self, ingress_port, pkt)
-
- arp_mac = "00:00:00:00:00:01"
- exp_pkt = copy.deepcopy(pkt)
- exp_pkt[Ether].dst = pkt[ARP].hwsrc
- exp_pkt[Ether].src = arp_mac
- exp_pkt[ARP].op = 2
- exp_pkt[ARP].hwsrc = arp_mac
- exp_pkt[ARP].psrc = ipv4_dst
- exp_pkt[ARP].hwdst = pkt[ARP].hwsrc
- exp_pkt[ARP].pdst = pkt[ARP].psrc
- print("Expecting ARP Reply for {} at port {}".
- format(ipv4_dst, ingress_port))
- verify_packet(self, exp_pkt, ingress_port)
-
- print("IP address {} is at {}".
- format(ipv4_dst, arp_mac))
-
-
-class LpmARP(BaseProgramTest):
- def runTest(self):
- ingress_port = test_param_get("ingress_port", 0)
- lpm_addr = test_param_get("lpm_addr", "192.168.1.0/24")
- ipv4_dst = test_param_get("ipv4_dst", "192.168.1.13")
- #dst_addr = test_param_get("dst_addr", "00:00:02:00:00:01")
- dst_addr = test_param_get("dst_addr", "FF:FF:FF:FF:FF:FF")
- new_mac_da = test_param_get("new_mac_da", "00:00:03:00:00:03")
- new_mac_sa = test_param_get("new_mac_sa", "00:00:00:00:00:01")
- egress_port = test_param_get("egress_port", 4)
- nexthop_id = test_param_get("nexthop_id", 101)
- same_flag = test_param_get("same_flag", False)
- src_supervid = test_param_get("src_supervid", 10)
- switch_mac = test_param_get("switch_mac", "00:00:00:00:00:01")
- dst_subvid = 3
- dst_supervid = 10
- src_subvid = 2
- src_supervid = 10
- arp_mac = "00:00:00:00:00:01"
- (lpm_ipv4, lpm_p_len) = lpm_addr.split("/")
-
- print("\n")
- print("Test Run")
- print("========")
-
- #
- # Program an entry in nexthop:
- # nexthop_id --> l3_switch(egress_port. new_mac_da, new_mac_da)
- #
- key_list = [
- self.nexthop.make_key([
- gc.KeyTuple('nexthop_id', nexthop_id)])
- ]
-
- data_list = [
- self.nexthop.make_data([gc.DataTuple('port', egress_port),
- gc.DataTuple('new_mac_da', new_mac_da),
- gc.DataTuple('new_mac_sa', new_mac_sa)],
- "Ingress.l3_switch")
- ]
-
- self.nexthop.entry_add(self.dev_tgt, key_list, data_list);
- print(" Added an entry to nexthop: {} --> send({})".
- format(nexthop_id, egress_port))
-
- #
- # Program an entry in ipv4_lpm:
- # lpm_addr/prefix --> set_nexthop(nexthop_id)
- # Note the extra named argument (prefix_len) to gc.KeyTuple().
- #
- key_list = [
- self.ipv4_lpm.make_key([
- gc.KeyTuple('same_flag', same_flag),
- gc.KeyTuple('meta.dst_ipv4',
- value=lpm_ipv4, prefix_len=int(lpm_p_len))])
- ]
-
- data_list = [
- self.ipv4_lpm.make_data([
- gc.DataTuple('nexthop', nexthop_id)], "Ingress.set_nexthop")
- ]
-
- self.ipv4_lpm.entry_add(self.dev_tgt, key_list, data_list);
- print(" Added an entry to ipv4_lpm: {}/{} --> set_nexthop({})".format(
- lpm_ipv4, lpm_p_len, nexthop_id))
-
-
- #
- # Program an entry in src_supervid_to_mac: super_vid --> mac
- #
- key_list = [
- self.src_supervid_to_mac.make_key([
- gc.KeyTuple('src_supervid', src_supervid)])
- ]
-
- data_list = [
- self.src_supervid_to_mac.make_data([
- gc.DataTuple('new_mac_switch', switch_mac)], "Ingress.set_super_MAC")
- ]
-
- self.src_supervid_to_mac.entry_add(self.dev_tgt, key_list, data_list);
- print(" Added an entry to src_supervid_to_mac: VLAN IF{} --> super_mac({})".
- format(src_supervid, switch_mac))
-
- #
- # Program an entry in dstIP_to_dstvid: dst_ip --> dst_vid
- #
- key_list = [
- self.dstIP_to_dstvid.make_key([
- gc.KeyTuple('meta.dst_ipv4', ipv4_dst)])
- ]
-
- data_list = [
- self.dstIP_to_dstvid.make_data([
- gc.DataTuple('subvid', dst_subvid),
- gc.DataTuple('supervid', dst_supervid)], "Ingress.set_dst_vid")
- ]
-
- self.dstIP_to_dstvid.entry_add(self.dev_tgt, key_list, data_list);
- print(" Added an entry to dstIP_to_dstvid: dst_ip{} --> set_dst_vid({},{})".
- format(ipv4_dst, dst_subvid, dst_supervid))
-
- #
- # Program an entry in Port_to_srcvid: ingress_port --> src_vid
- #
- key_list = [
- self.Port_to_srcvid.make_key([
- gc.KeyTuple('ig_intr_md.ingress_port', ingress_port)])
- ]
-
- data_list = [
- self.Port_to_srcvid.make_data([
- gc.DataTuple('subvid', src_subvid),
- gc.DataTuple('supervid', src_supervid)], "Ingress.set_src_vid")
- ]
-
- self.Port_to_srcvid.entry_add(self.dev_tgt, key_list, data_list);
- print(" Added an entry to Port_to_srcvid: dst_ip{} --> set_src_vid({},{})".
- format(ingress_port, src_subvid, src_supervid))
-
- #
- # Program an entry in is_l2_switch: dst_mac --> l2_switch
- #
- key_list = [
- self.is_l2_switch.make_key([
- gc.KeyTuple('hdr.ethernet.dst_addr', dst_addr)])
- ]
-
- data_list = [
- self.is_l2_switch.make_data([], "Ingress.set_l3_flag")
- ]
-
- self.is_l2_switch.entry_add(self.dev_tgt, key_list, data_list);
- print(" Added an entry to is_l2_switch: {} --> l3_switch()".
- format(dst_addr))
-
- # Prepare the test packet and the expected packet
-
- pkt = simple_arp_packet(ip_tgt=ipv4_dst, pktlen=100)
- send_packet(self, ingress_port, pkt)
-
- exp_pkt = copy.deepcopy(pkt)
- exp_pkt[Ether].dst = pkt[ARP].hwsrc
- exp_pkt[Ether].src = arp_mac
- exp_pkt[ARP].op = 2
- exp_pkt[ARP].hwsrc = arp_mac
- exp_pkt[ARP].psrc = ipv4_dst
- exp_pkt[ARP].hwdst = pkt[ARP].hwsrc
- exp_pkt[ARP].pdst = pkt[ARP].psrc
- print("Expecting ARP Reply for {} at port {}".
- format(ipv4_dst, ingress_port))
- verify_packet(self, exp_pkt, ingress_port)
-
- print("IP address {} is at {}".
- format(ipv4_dst, arp_mac))