summaryrefslogtreecommitdiff
path: root/ptf/test0418.py
diff options
context:
space:
mode:
Diffstat (limited to 'ptf/test0418.py')
-rw-r--r--ptf/test0418.py852
1 files changed, 852 insertions, 0 deletions
diff --git a/ptf/test0418.py b/ptf/test0418.py
new file mode 100644
index 0000000..15539ca
--- /dev/null
+++ b/ptf/test0418.py
@@ -0,0 +1,852 @@
+
+"""
+Simple PTF test for super_vlan
+"""
+
+######### STANDARD MODULE IMPORTS ########
+import unittest
+import logging
+import grpc
+import pdb
+
+######### PTF modules for BFRuntime Client Library APIs #######
+import ptf
+from ptf.testutils import *
+from bfruntime_client_base_tests import BfRuntimeTest
+import bfrt_grpc.bfruntime_pb2 as bfruntime_pb2
+import bfrt_grpc.client as gc
+
+########## Basic Initialization ############
+class BaseProgramTest(BfRuntimeTest):
+ # The setUp() method is used to prepare the test fixture. Typically
+ # you would use it to establich connection to the gRPC Server
+ #
+ # You can also put the initial device configuration there. However,
+ # if during this process an error is encountered, it will be considered
+ # as a test error (meaning the test is incorrect),
+ # rather than a test failure
+ #
+ # Here is the stuff we set up that is ready to use
+ # client_id
+ # p4_name
+ # bfrt_info
+ # dev
+ # dev_tgt
+ # allports
+ # tables -- the list of tables
+ # Individual tables of the program with short names
+ # ipv4_host
+ # ipv4_lpm
+ # nexthop
+ def setUp(self):
+ self.client_id = 0
+ self.p4_name = "super_vlan"
+ self.dev = 0
+ self.dev_tgt = gc.Target(self.dev, pipe_id=0xFFFF)
+
+ print("\n")
+ print("Test Setup")
+ print("==========")
+
+ BfRuntimeTest.setUp(self, self.client_id, self.p4_name)
+ self.bfrt_info = self.interface.bfrt_info_get(self.p4_name)
+
+ print(" Connected to Device: {}, Program: {}, ClientId: {}".format(
+ self.dev, self.p4_name, self.client_id))
+
+ # Since this class is not a test per se, we can use the setup method
+ # for common setup. For example, we can have our tables and annotations
+ # ready
+ self.is_l2_switch = self.bfrt_info.table_get("Ingress.is_l2_switch")
+ self.is_l2_switch.info.key_field_annotation_add(
+ "hdr.ethernet.dst_addr", "mac")
+
+ self.Trunk_switch = self.bfrt_info.table_get("Ingress.Trunk_switch")
+ self.Trunk_switch.info.key_field_annotation_add(
+ "hdr.ethernet.dst_addr", "mac")
+
+ self.Port_to_srcvid = self.bfrt_info.table_get("Ingress.Port_to_srcvid")
+ self.Port_to_srcvid.info.key_field_annotation_add(
+ "ig_intr_md.ingress_port", "int")
+
+ self.dstIP_to_dstvid = self.bfrt_info.table_get("Ingress.dstIP_to_dstvid")
+ self.dstIP_to_dstvid.info.key_field_annotation_add(
+ "meta.dst_ipv4", "ipv4")
+
+ self.src_supervid_to_mac = self.bfrt_info.table_get("Ingress.src_supervid_to_mac")
+ self.src_supervid_to_mac.info.data_field_annotation_add(
+ "new_mac_switch", "Ingress.set_super_MAC", "mac")
+
+ self.ipv4_host = self.bfrt_info.table_get("Ingress.ipv4_host")
+ self.ipv4_host.info.key_field_annotation_add(
+ "meta.dst_ipv4", "ipv4")
+
+ self.ipv4_lpm = self.bfrt_info.table_get("Ingress.ipv4_lpm")
+ self.ipv4_lpm.info.key_field_annotation_add(
+ "meta.dst_ipv4", "ipv4")
+
+ self.nexthop = self.bfrt_info.table_get("Ingress.nexthop")
+ self.nexthop.info.data_field_annotation_add(
+ "new_mac_da", "Ingress.l3_switch", "mac")
+ self.nexthop.info.data_field_annotation_add(
+ "new_mac_sa", "Ingress.l3_switch", "mac")
+
+ self.egr_vlan_port = self.bfrt_info.table_get("Egress.egr_vlan_port")
+ self.egr_vlan_port.info.key_field_annotation_add(
+ "meta.vid", "int")
+
+
+ self.tables = [ self.is_l2_switch, self.Trunk_switch, self.Port_to_srcvid, self.dstIP_to_dstvid, self.src_supervid_to_mac, self.ipv4_host, self.ipv4_lpm, self.nexthop, self.egr_vlan_port ]
+
+ # Create a list of all ports available on the device
+ self.swports = []
+ for (device, port, ifname) in ptf.config['interfaces']:
+ self.swports.append(port)
+ self.swports.sort()
+
+ # Optional, but highly recommended
+ self.cleanUp()
+
+ # Use Cleanup Method to clear the tables before and after the test starts
+ # (the latter is done as a part of tearDown()
+ def cleanUp(self):
+ print("\n")
+ print("Table Cleanup:")
+ print("==============")
+
+ try:
+ for t in self.tables:
+ print(" Clearing Table {}".format(t.info.name_get()))
+ keys = []
+ for (d, k) in t.entry_get(self.dev_tgt):
+ if k is not None:
+ keys.append(k)
+ t.entry_del(self.dev_tgt, keys)
+ # Not all tables support default entry
+ try:
+ t.default_entry_reset(self.dev_tgt)
+ except:
+ pass
+ except Exception as e:
+ print("Error cleaning up: {}".format(e))
+
+ # Use tearDown() method to return the DUT to the initial state by cleaning
+ # all the configuration and clearing up the connection
+ def tearDown(self):
+ print("\n")
+ print("Test TearDown:")
+ print("==============")
+
+ self.cleanUp()
+
+ # Call the Parent tearDown
+ BfRuntimeTest.tearDown(self)
+
+#
+# Individual tests can now be subclassed from BaseProgramTest
+#
+
+############################################################################
+################# I N D I V I D U A L T E S T S #########################
+############################################################################
+
+class HostSend(BaseProgramTest):
+ def runTest(self):
+ ingress_port = test_param_get("ingress_port", 0)
+ ipv4_dst = test_param_get("ipv4_dst", "192.168.1.2")
+ dst_addr = test_param_get("dst_addr", "00:00:02:00:00:02")
+ egress_port = test_param_get("egress_port", 4)
+ nexthop_id = test_param_get("nexthop_id", 100)
+ same_flag = test_param_get("same_flag", True)
+
+ print("\n")
+ print("Test Run")
+ print("========")
+
+ #
+ # Program an entry in nexthop: nexthop_id --> send(egress_port)
+ #
+ key_list = [
+ self.nexthop.make_key([
+ gc.KeyTuple('nexthop_id', nexthop_id)])
+ ]
+
+ data_list = [
+ self.nexthop.make_data([], "Ingress.send")
+ ]
+
+ self.nexthop.entry_add(self.dev_tgt, key_list, data_list);
+ print(" Added an entry to nexthop: {} --> send()".format(
+ nexthop_id))
+
+ #
+ # Program an entry in ipv4_host: ipv4_dst --> set_nexthop(nexthop_id)
+ #
+
+ key_list = [
+ self.ipv4_host.make_key([
+ gc.KeyTuple('same_flag', same_flag),
+ gc.KeyTuple('meta.dst_ipv4', ipv4_dst)])
+ ]
+
+ data_list = [
+ self.ipv4_host.make_data([
+ gc.DataTuple('nexthop', nexthop_id)], "Ingress.set_nexthop")
+ ]
+
+ self.ipv4_host.entry_add(self.dev_tgt, key_list, data_list);
+ print(" Added an entry to ipv4_host: {},{} --> set_nexthop({})".
+ format(same_flag, ipv4_dst, nexthop_id))
+
+ #
+ # Program an entry in is_l2_switch: dst_mac --> l2_switch
+ #
+ key_list = [
+ self.is_l2_switch.make_key([
+ gc.KeyTuple('hdr.ethernet.dst_addr', dst_addr)])
+ ]
+
+ data_list = [
+ self.is_l2_switch.make_data([
+ gc.DataTuple('port', egress_port)], "Ingress.l2_switch")
+ ]
+
+ self.is_l2_switch.entry_add(self.dev_tgt, key_list, data_list);
+ print(" Added an entry to is_l2_switch: {} --> l2_switch({})".
+ format(dst_addr, egress_port))
+
+ #
+ # Send a test packet
+ #
+ print(" Sending packet with IPv4 DST ADDR={} into port {}"
+ .format(ipv4_dst, ingress_port))
+ pkt = simple_tcp_packet(eth_dst=dst_addr,
+ eth_src='00:00:02:00:00:01',
+ ip_dst=ipv4_dst,
+ ip_id=101,
+ ip_ttl=64,
+ ip_ihl=5)
+ send_packet(self, ingress_port, pkt)
+
+ #
+ # Wait for the egress packet and verify it
+ #
+ print(" Expecting the packet to be forwarded to port {}"
+ .format(egress_port))
+ verify_packet(self, pkt, egress_port)
+ print(" Packet received of port {}".format(egress_port))
+
+
+class HostDrop(BaseProgramTest):
+ def runTest(self):
+ ingress_port = test_param_get("ingress_port", 0)
+ ipv4_dst = test_param_get("ipv4_dst", "192.168.1.3")
+ dst_addr = test_param_get("dst_addr", "00:00:02:00:00:03")
+ egress_port = test_param_get("egress_port", 5)
+ nexthop_id = test_param_get("nexthop_id", 102)
+ same_flag = test_param_get("same_flag", True)
+
+ print("\n")
+ print("Test Run")
+ print("========")
+
+ #
+ # Program an entry in nexthop: nexthop_id --> drop()
+ #
+ key_list = [
+ self.nexthop.make_key([
+ gc.KeyTuple('nexthop_id', nexthop_id)])
+ ]
+
+ data_list = [
+ self.nexthop.make_data([], "Ingress.drop")
+ ]
+
+ self.nexthop.entry_add(self.dev_tgt, key_list, data_list);
+ print(" Added an entry to nexthop: {} --> drop()".
+ format(nexthop_id))
+
+ #
+ # Program an entry in ipv4_host: ipv4_dst --> set_nexthop(nexthop_id)
+ #
+ key_list = [
+ self.ipv4_host.make_key([
+ gc.KeyTuple('same_flag', same_flag),
+ gc.KeyTuple('meta.dst_ipv4', ipv4_dst)])
+ ]
+
+ data_list = [
+ self.ipv4_host.make_data([
+ gc.DataTuple('nexthop', nexthop_id)], "Ingress.set_nexthop")
+ ]
+
+ self.ipv4_host.entry_add(self.dev_tgt, key_list, data_list);
+ print(" Added an entry to ipv4_host: {},{} --> set_nexthop({})".
+ format(same_flag, ipv4_dst, nexthop_id))
+
+ #
+ # Program an entry in is_l2_switch: dst_mac --> l2_switch
+ #
+ key_list = [
+ self.is_l2_switch.make_key([
+ gc.KeyTuple('hdr.ethernet.dst_addr', dst_addr)])
+ ]
+
+ data_list = [
+ self.is_l2_switch.make_data([
+ gc.DataTuple('port', egress_port)], "Ingress.l2_switch")
+ ]
+
+ self.is_l2_switch.entry_add(self.dev_tgt, key_list, data_list);
+ print(" Added an entry to is_l2_switch: {} --> l2_switch({})".
+ format(dst_addr, egress_port))
+
+
+ #
+ # Send a test packet
+ #
+ print(" Sending packet with IPv4 DST ADDR={} into port {}"
+ .format(ipv4_dst, ingress_port))
+ pkt = simple_tcp_packet(eth_dst=dst_addr,
+ eth_src='00:00:02:00:00:01',
+ ip_dst=ipv4_dst,
+ ip_id=101,
+ ip_ttl=64,
+ ip_ihl=5)
+ send_packet(self, ingress_port, pkt)
+ print(" Expecting No packets anywhere")
+
+ #
+ # Wait to make sure no packet egresses anywhere. self.swports is the
+ # list of all ports the test is using. It is set up in the setUp()
+ # method of the parent class
+ #
+ verify_no_packet_any(self, pkt, self.swports)
+ print(" No packets received")
+
+
+class HostL3Switch(BaseProgramTest):
+ def runTest(self):
+ ingress_port = test_param_get("ingress_port", 0)
+ ipv4_dst = test_param_get("ipv4_dst", "192.168.1.13")
+ dst_addr = test_param_get("dst_addr", "00:00:00:00:00:01")
+ new_mac_da = test_param_get("new_mac_da", "00:00:03:00:00:03")
+ new_mac_sa = test_param_get("new_mac_sa", "00:00:00:00:00:01")
+ egress_port = test_param_get("egress_port", 4)
+ nexthop_id = test_param_get("nexthop_id", 101)
+ same_flag = test_param_get("same_flag", False)
+ src_supervid = test_param_get("src_supervid", 10)
+ switch_mac = test_param_get("switch_mac", "00:00:00:00:00:01")
+ dst_subvid = 3
+ dst_supervid = 10
+ src_subvid = 2
+ src_supervid = 10
+
+ print("\n")
+ print("Test Run")
+ print("========")
+
+ #
+ # Program an entry in nexthop:
+ # nexthop_id --> l3_switch(egress_port. new_mac_da, new_mac_da)
+ #
+ key_list = [
+ self.nexthop.make_key([
+ gc.KeyTuple('nexthop_id', nexthop_id)])
+ ]
+
+ data_list = [
+ self.nexthop.make_data([gc.DataTuple('port', egress_port),
+ gc.DataTuple('new_mac_da', new_mac_da),
+ gc.DataTuple('new_mac_sa', new_mac_sa)],
+ "Ingress.l3_switch")
+ ]
+
+ self.nexthop.entry_add(self.dev_tgt, key_list, data_list);
+ print(" Added an entry to nexthop: {} --> send({})".
+ format(nexthop_id, egress_port))
+
+ #
+ # Program an entry in IPv4: ipv4_dst --> set_nexthop(nexthop_id)
+ #
+ key_list = [
+ self.ipv4_host.make_key([
+ gc.KeyTuple('same_flag', same_flag),
+ gc.KeyTuple('meta.dst_ipv4', ipv4_dst)])
+ ]
+
+ data_list = [
+ self.ipv4_host.make_data([
+ gc.DataTuple('nexthop', nexthop_id)], "Ingress.set_nexthop")
+ ]
+
+ self.ipv4_host.entry_add(self.dev_tgt, key_list, data_list);
+ print(" Added an entry to ipv4_host: {},{} --> set_nexthop({})".
+ format(same_flag, ipv4_dst, nexthop_id))
+
+ #
+ # Program an entry in src_supervid_to_mac: super_vid --> mac
+ #
+ key_list = [
+ self.src_supervid_to_mac.make_key([
+ gc.KeyTuple('src_supervid', src_supervid)])
+ ]
+
+ data_list = [
+ self.src_supervid_to_mac.make_data([
+ gc.DataTuple('new_mac_switch', switch_mac)], "Ingress.set_super_MAC")
+ ]
+
+ self.src_supervid_to_mac.entry_add(self.dev_tgt, key_list, data_list);
+ print(" Added an entry to src_supervid_to_mac: VLAN IF{} --> super_mac({})".
+ format(src_supervid, switch_mac))
+
+ #
+ # Program an entry in dstIP_to_dstvid: dst_ip --> dst_vid
+ #
+ key_list = [
+ self.dstIP_to_dstvid.make_key([
+ gc.KeyTuple('meta.dst_ipv4', ipv4_dst)])
+ ]
+
+ data_list = [
+ self.dstIP_to_dstvid.make_data([
+ gc.DataTuple('subvid', dst_subvid),
+ gc.DataTuple('supervid', dst_supervid)], "Ingress.set_dst_vid")
+ ]
+
+ self.dstIP_to_dstvid.entry_add(self.dev_tgt, key_list, data_list);
+ print(" Added an entry to dstIP_to_dstvid: dst_ip{} --> set_dst_vid({},{})".
+ format(ipv4_dst, dst_subvid, dst_supervid))
+
+ #
+ # Program an entry in Port_to_srcvid: dst_ip --> dst_vid
+ #
+ key_list = [
+ self.Port_to_srcvid.make_key([
+ gc.KeyTuple('ig_intr_md.ingress_port', ingress_port)])
+ ]
+
+ data_list = [
+ self.Port_to_srcvid.make_data([
+ gc.DataTuple('subvid', src_subvid),
+ gc.DataTuple('supervid', src_supervid)], "Ingress.set_src_vid")
+ ]
+
+ self.Port_to_srcvid.entry_add(self.dev_tgt, key_list, data_list);
+ print(" Added an entry to Port_to_srcvid: dst_ip{} --> set_src_vid({},{})".
+ format(ingress_port, src_subvid, src_supervid))
+
+ #
+ # Program an entry in is_l2_switch: dst_mac --> l2_switch
+ #
+ key_list = [
+ self.is_l2_switch.make_key([
+ gc.KeyTuple('hdr.ethernet.dst_addr', dst_addr)])
+ ]
+
+ data_list = [
+ self.is_l2_switch.make_data([], "Ingress.set_l3_flag")
+ ]
+
+ self.is_l2_switch.entry_add(self.dev_tgt, key_list, data_list);
+ print(" Added an entry to is_l2_switch: {} --> l3_switch()".
+ format(dst_addr))
+
+ key_list = [
+ self.egr_vlan_port.make_key([
+ gc.KeyTuple('meta.vid', src_subvid),
+ gc.KeyTuple('egress_port', egress_port)])
+ ]
+ data_list = [
+ self.egr_vlan_port.make_data([], "Egress.send_untagged")
+ ]
+
+ self.egr_vlan_port.entry_add(self.dev_tgt, key_list, data_list);
+
+ #
+ # Prepare the test packet and the expected packet
+ send_pkt = simple_tcp_packet(eth_dst=dst_addr,
+ eth_src='00:00:02:00:00:01',
+ ip_dst=ipv4_dst,
+ ip_id=101,
+ ip_ttl=64,
+ ip_ihl=5)
+ exp_pkt = copy.deepcopy(send_pkt)
+ exp_pkt[Ether].dst = new_mac_da
+ exp_pkt[Ether].src = new_mac_sa
+ exp_pkt[IP].ttl -= 1
+
+ #
+ # Send a test packet
+ #
+ print(" Sending packet with IPv4 DST ADDR={} into port {}"
+ .format(ipv4_dst, ingress_port))
+
+ send_packet(self, ingress_port, send_pkt)
+
+ #
+ # Wait for the egress packet and verify it
+ #
+ print(" Expecting the modified packet to be forwarded to port {}"
+ .format(egress_port))
+
+ verify_packet(self, exp_pkt, egress_port)
+ print(" Modified Packet received of port {}"
+ .format(egress_port))
+
+
+class TrunkForward(BaseProgramTest):
+ def runTest(self):
+ ingress_port = test_param_get("ingress_port", 0)
+ ipv4_dst = test_param_get("ipv4_dst", "192.168.2.1")
+ dst_addr = test_param_get("dst_addr", "01:00:02:00:00:01")
+ egress_port = test_param_get("egress_port", 5)
+
+ print("\n")
+ print("Test Run")
+ print("========")
+
+ #
+ # Program an entry in Trunk_switch: dst_mac --> set_trunk_port
+ #
+ key_list = [
+ self.Trunk_switch.make_key([
+ gc.KeyTuple('hdr.ethernet.dst_addr', dst_addr)])
+ ]
+
+ data_list = [
+ self.Trunk_switch.make_data([
+ gc.DataTuple('port', egress_port)], "Ingress.set_trunk_port")
+ ]
+
+ self.Trunk_switch.entry_add(self.dev_tgt, key_list, data_list);
+ print(" Added an entry to Trunk_switch: {} --> set_trunk_port({})".
+ format(dst_addr, egress_port))
+
+ #
+ # Send a test packet
+ #
+ print(" Sending packet with IPv4 DST ADDR={} into port {}"
+ .format(ipv4_dst, ingress_port))
+ pkt = simple_tcp_packet(eth_dst=dst_addr,
+ eth_src='00:00:02:00:00:01',
+ ip_dst=ipv4_dst,
+ ip_id=101,
+ ip_ttl=64,
+ ip_ihl=5)
+ send_packet(self, ingress_port, pkt)
+
+ #
+ # Wait for the egress packet and verify it
+ #
+ print(" Expecting the packet to be forwarded to port {}"
+ .format(egress_port))
+ verify_packet(self, pkt, egress_port)
+ print(" Packet received of port {}".format(egress_port))
+
+class HostArp(BaseProgramTest):
+ def runTest(self):
+ ingress_port = test_param_get("ingress_port", 0)
+ ipv4_dst = test_param_get("ipv4_dst", "192.168.1.13")
+ #dst_addr = test_param_get("dst_addr", "00:00:02:00:00:01")
+ dst_addr = test_param_get("dst_addr", "FF:FF:FF:FF:FF:FF")
+ new_mac_da = test_param_get("new_mac_da", "00:00:03:00:00:03")
+ new_mac_sa = test_param_get("new_mac_sa", "00:00:00:00:00:01")
+ egress_port = test_param_get("egress_port", 4)
+ nexthop_id = test_param_get("nexthop_id", 101)
+ same_flag = test_param_get("same_flag", False)
+ src_supervid = test_param_get("src_supervid", 10)
+ switch_mac = test_param_get("switch_mac", "00:00:00:00:00:01")
+ dst_subvid = 3
+ dst_supervid = 10
+ src_subvid = 2
+ src_supervid = 10
+ print("\n")
+ print("Test Run")
+ print("========")
+
+ #
+ # Program an entry in nexthop:
+ # nexthop_id --> l3_switch(egress_port. new_mac_da, new_mac_da)
+ #
+ key_list = [
+ self.nexthop.make_key([
+ gc.KeyTuple('nexthop_id', nexthop_id)])
+ ]
+
+ data_list = [
+ self.nexthop.make_data([gc.DataTuple('port', egress_port),
+ gc.DataTuple('new_mac_da', new_mac_da),
+ gc.DataTuple('new_mac_sa', new_mac_sa)],
+ "Ingress.l3_switch")
+ ]
+
+ self.nexthop.entry_add(self.dev_tgt, key_list, data_list);
+ print(" Added an entry to nexthop: {} --> send({})".
+ format(nexthop_id, egress_port))
+
+ #
+ # Program an entry in IPv4: ipv4_dst --> set_nexthop(nexthop_id)
+ #
+ key_list = [
+ self.ipv4_host.make_key([
+ gc.KeyTuple('same_flag', same_flag),
+ gc.KeyTuple('meta.dst_ipv4', ipv4_dst)])
+ ]
+
+ data_list = [
+ self.ipv4_host.make_data([
+ gc.DataTuple('nexthop', nexthop_id)], "Ingress.set_nexthop")
+ ]
+
+ self.ipv4_host.entry_add(self.dev_tgt, key_list, data_list);
+ print(" Added an entry to ipv4_host: {},{} --> set_nexthop({})".
+ format(same_flag, ipv4_dst, nexthop_id))
+
+ #
+ # Program an entry in src_supervid_to_mac: super_vid --> mac
+ #
+ key_list = [
+ self.src_supervid_to_mac.make_key([
+ gc.KeyTuple('src_supervid', src_supervid)])
+ ]
+
+ data_list = [
+ self.src_supervid_to_mac.make_data([
+ gc.DataTuple('new_mac_switch', switch_mac)], "Ingress.set_super_MAC")
+ ]
+
+ self.src_supervid_to_mac.entry_add(self.dev_tgt, key_list, data_list);
+ print(" Added an entry to src_supervid_to_mac: VLAN IF{} --> super_mac({})".
+ format(src_supervid, switch_mac))
+
+ #
+ # Program an entry in dstIP_to_dstvid: dst_ip --> dst_vid
+ #
+ key_list = [
+ self.dstIP_to_dstvid.make_key([
+ gc.KeyTuple('meta.dst_ipv4', ipv4_dst)])
+ ]
+
+ data_list = [
+ self.dstIP_to_dstvid.make_data([
+ gc.DataTuple('subvid', dst_subvid),
+ gc.DataTuple('supervid', dst_supervid)], "Ingress.set_dst_vid")
+ ]
+
+ self.dstIP_to_dstvid.entry_add(self.dev_tgt, key_list, data_list);
+ print(" Added an entry to dstIP_to_dstvid: dst_ip{} --> set_dst_vid({},{})".
+ format(ipv4_dst, dst_subvid, dst_supervid))
+
+ #
+ # Program an entry in Port_to_srcvid: ingress_port --> src_vid
+ #
+ key_list = [
+ self.Port_to_srcvid.make_key([
+ gc.KeyTuple('ig_intr_md.ingress_port', ingress_port)])
+ ]
+
+ data_list = [
+ self.Port_to_srcvid.make_data([
+ gc.DataTuple('subvid', src_subvid),
+ gc.DataTuple('supervid', src_supervid)], "Ingress.set_src_vid")
+ ]
+
+ self.Port_to_srcvid.entry_add(self.dev_tgt, key_list, data_list);
+ print(" Added an entry to Port_to_srcvid: dst_ip{} --> set_src_vid({},{})".
+ format(ingress_port, src_subvid, src_supervid))
+
+ #
+ # Program an entry in is_l2_switch: dst_mac --> l2_switch
+ #
+ key_list = [
+ self.is_l2_switch.make_key([
+ gc.KeyTuple('hdr.ethernet.dst_addr', dst_addr)])
+ ]
+
+ data_list = [
+ self.is_l2_switch.make_data([], "Ingress.set_l3_flag")
+ ]
+
+ self.is_l2_switch.entry_add(self.dev_tgt, key_list, data_list);
+ print(" Added an entry to is_l2_switch: {} --> l3_switch()".
+ format(dst_addr))
+
+ # Prepare the test packet and the expected packet
+
+ pkt = simple_arp_packet(ip_tgt=ipv4_dst, pktlen=100)
+ send_packet(self, ingress_port, pkt)
+
+ arp_mac = "00:00:00:00:00:01"
+ exp_pkt = copy.deepcopy(pkt)
+ exp_pkt[Ether].dst = pkt[ARP].hwsrc
+ exp_pkt[Ether].src = arp_mac
+ exp_pkt[ARP].op = 2
+ exp_pkt[ARP].hwsrc = arp_mac
+ exp_pkt[ARP].psrc = ipv4_dst
+ exp_pkt[ARP].hwdst = pkt[ARP].hwsrc
+ exp_pkt[ARP].pdst = pkt[ARP].psrc
+ print("Expecting ARP Reply for {} at port {}".
+ format(ipv4_dst, ingress_port))
+ verify_packet(self, exp_pkt, ingress_port)
+
+ print("IP address {} is at {}".
+ format(ipv4_dst, arp_mac))
+
+
+class LpmARP(BaseProgramTest):
+ def runTest(self):
+ ingress_port = test_param_get("ingress_port", 0)
+ lpm_addr = test_param_get("lpm_addr", "192.168.1.0/24")
+ ipv4_dst = test_param_get("ipv4_dst", "192.168.1.13")
+ #dst_addr = test_param_get("dst_addr", "00:00:02:00:00:01")
+ dst_addr = test_param_get("dst_addr", "FF:FF:FF:FF:FF:FF")
+ new_mac_da = test_param_get("new_mac_da", "00:00:03:00:00:03")
+ new_mac_sa = test_param_get("new_mac_sa", "00:00:00:00:00:01")
+ egress_port = test_param_get("egress_port", 4)
+ nexthop_id = test_param_get("nexthop_id", 101)
+ same_flag = test_param_get("same_flag", False)
+ src_supervid = test_param_get("src_supervid", 10)
+ switch_mac = test_param_get("switch_mac", "00:00:00:00:00:01")
+ dst_subvid = 3
+ dst_supervid = 10
+ src_subvid = 2
+ src_supervid = 10
+ arp_mac = "00:00:00:00:00:01"
+ (lpm_ipv4, lpm_p_len) = lpm_addr.split("/")
+
+ print("\n")
+ print("Test Run")
+ print("========")
+
+ #
+ # Program an entry in nexthop:
+ # nexthop_id --> l3_switch(egress_port. new_mac_da, new_mac_da)
+ #
+ key_list = [
+ self.nexthop.make_key([
+ gc.KeyTuple('nexthop_id', nexthop_id)])
+ ]
+
+ data_list = [
+ self.nexthop.make_data([gc.DataTuple('port', egress_port),
+ gc.DataTuple('new_mac_da', new_mac_da),
+ gc.DataTuple('new_mac_sa', new_mac_sa)],
+ "Ingress.l3_switch")
+ ]
+
+ self.nexthop.entry_add(self.dev_tgt, key_list, data_list);
+ print(" Added an entry to nexthop: {} --> send({})".
+ format(nexthop_id, egress_port))
+
+ #
+ # Program an entry in ipv4_lpm:
+ # lpm_addr/prefix --> set_nexthop(nexthop_id)
+ # Note the extra named argument (prefix_len) to gc.KeyTuple().
+ #
+ key_list = [
+ self.ipv4_lpm.make_key([
+ gc.KeyTuple('same_flag', same_flag),
+ gc.KeyTuple('meta.dst_ipv4',
+ value=lpm_ipv4, prefix_len=int(lpm_p_len))])
+ ]
+
+ data_list = [
+ self.ipv4_lpm.make_data([
+ gc.DataTuple('nexthop', nexthop_id)], "Ingress.set_nexthop")
+ ]
+
+ self.ipv4_lpm.entry_add(self.dev_tgt, key_list, data_list);
+ print(" Added an entry to ipv4_lpm: {}/{} --> set_nexthop({})".format(
+ lpm_ipv4, lpm_p_len, nexthop_id))
+
+
+ #
+ # Program an entry in src_supervid_to_mac: super_vid --> mac
+ #
+ key_list = [
+ self.src_supervid_to_mac.make_key([
+ gc.KeyTuple('src_supervid', src_supervid)])
+ ]
+
+ data_list = [
+ self.src_supervid_to_mac.make_data([
+ gc.DataTuple('new_mac_switch', switch_mac)], "Ingress.set_super_MAC")
+ ]
+
+ self.src_supervid_to_mac.entry_add(self.dev_tgt, key_list, data_list);
+ print(" Added an entry to src_supervid_to_mac: VLAN IF{} --> super_mac({})".
+ format(src_supervid, switch_mac))
+
+ #
+ # Program an entry in dstIP_to_dstvid: dst_ip --> dst_vid
+ #
+ key_list = [
+ self.dstIP_to_dstvid.make_key([
+ gc.KeyTuple('meta.dst_ipv4', ipv4_dst)])
+ ]
+
+ data_list = [
+ self.dstIP_to_dstvid.make_data([
+ gc.DataTuple('subvid', dst_subvid),
+ gc.DataTuple('supervid', dst_supervid)], "Ingress.set_dst_vid")
+ ]
+
+ self.dstIP_to_dstvid.entry_add(self.dev_tgt, key_list, data_list);
+ print(" Added an entry to dstIP_to_dstvid: dst_ip{} --> set_dst_vid({},{})".
+ format(ipv4_dst, dst_subvid, dst_supervid))
+
+ #
+ # Program an entry in Port_to_srcvid: ingress_port --> src_vid
+ #
+ key_list = [
+ self.Port_to_srcvid.make_key([
+ gc.KeyTuple('ig_intr_md.ingress_port', ingress_port)])
+ ]
+
+ data_list = [
+ self.Port_to_srcvid.make_data([
+ gc.DataTuple('subvid', src_subvid),
+ gc.DataTuple('supervid', src_supervid)], "Ingress.set_src_vid")
+ ]
+
+ self.Port_to_srcvid.entry_add(self.dev_tgt, key_list, data_list);
+ print(" Added an entry to Port_to_srcvid: dst_ip{} --> set_src_vid({},{})".
+ format(ingress_port, src_subvid, src_supervid))
+
+ #
+ # Program an entry in is_l2_switch: dst_mac --> l2_switch
+ #
+ key_list = [
+ self.is_l2_switch.make_key([
+ gc.KeyTuple('hdr.ethernet.dst_addr', dst_addr)])
+ ]
+
+ data_list = [
+ self.is_l2_switch.make_data([], "Ingress.set_l3_flag")
+ ]
+
+ self.is_l2_switch.entry_add(self.dev_tgt, key_list, data_list);
+ print(" Added an entry to is_l2_switch: {} --> l3_switch()".
+ format(dst_addr))
+
+ # Prepare the test packet and the expected packet
+
+ pkt = simple_arp_packet(ip_tgt=ipv4_dst, pktlen=100)
+ send_packet(self, ingress_port, pkt)
+
+ exp_pkt = copy.deepcopy(pkt)
+ exp_pkt[Ether].dst = pkt[ARP].hwsrc
+ exp_pkt[Ether].src = arp_mac
+ exp_pkt[ARP].op = 2
+ exp_pkt[ARP].hwsrc = arp_mac
+ exp_pkt[ARP].psrc = ipv4_dst
+ exp_pkt[ARP].hwdst = pkt[ARP].hwsrc
+ exp_pkt[ARP].pdst = pkt[ARP].psrc
+ print("Expecting ARP Reply for {} at port {}".
+ format(ipv4_dst, ingress_port))
+ verify_packet(self, exp_pkt, ingress_port)
+
+ print("IP address {} is at {}".
+ format(ipv4_dst, arp_mac))