from ryu.base import app_manager from ryu.ofproto import ofproto_v1_3 from ryu.controller import ofp_event from ryu.controller.handler import MAIN_DISPATCHER,CONFIG_DISPATCHER from ryu.controller.handler import set_ev_cls from ryu.lib.packet import packet from ryu.lib.packet import ethernet from ryu.lib.packet import ether_types from ryu.lib.packet import tcp,ipv4,udp,icmp,icmpv6,ipv6,arp from ryu.lib.packet import in_proto as inet from model.packet_predict import PacketData from model.benign_packetpredict import BenignPacketData from ryu.lib import pcaplib import csv import redis from product import redis_store from util.redis_pool import POOL import http.client import urllib.request import re import json from threading import Timer import joblib class SimpleSwitch(app_manager.RyuApp): OFP_VERSIONS = [ofproto_v1_3.OFP_VERSION] def __init__(self,*args,**kwargs): super(SimpleSwitch,self).__init__(*args,**kwargs) self.mac_to_port = {} #self.switch_ip = input('输入Mininet主机的ip地址:') #self.port_id = input('输入交换机s1-eth0在sflow的编号:') self.model_list=[] model_namelist = ["DT.model","RF.model","LightGBM.model"] model_dir = "model/" for model_name in model_namelist: model = joblib.load(model_dir+model_name) self.model_list.append(model) def add_flow(self,datapath,priority,hard_timeout,match,actions,remind_content): ofproto = datapath.ofproto ofp_parser = datapath.ofproto_parser inst = [ofp_parser.OFPInstructionActions(ofproto.OFPIT_APPLY_ACTIONS, actions)] mod = ofp_parser.OFPFlowMod(datapath=datapath,priority=priority,hard_timeout=hard_timeout, match=match,instructions=inst) print("install to datapath,"+remind_content) datapath.send_msg(mod) #监控当前链路流速 def get_sflow_bandwidth(self): port_id = '3' switch_ip = '10.0.1.10' port = port_id + '.ifinpkts' url = "http://127.0.0.1:8008/metric/"+ switch_ip + '/' + port+'/json' #http://127.0.0.1:8008/metric/10.0.1.10/3.ifinpkts/json sflow_data = urllib.request.urlopen(url) data = sflow_data.read() if data: value_dict = json.loads(data)[0] bandwidth = value_dict.get('metricValue') print('当前数据包速率是:%s' % bandwidth) return bandwidth else: print('当前数据包速率是:none' ) #t = Timer(5,flow_change) #t.start() @set_ev_cls(ofp_event.EventOFPSwitchFeatures,CONFIG_DISPATCHER) def switch_features_handler(self,ev): datapath = ev.msg.datapath ofproto = datapath.ofproto ofp_parser = datapath.ofproto_parser #交换机连上的时候,下发一条默认流表,没有匹配到其他流表的都上送到交换机 match = ofp_parser.OFPMatch() actions = [ofp_parser.OFPActionOutput(ofproto.OFPP_CONTROLLER,ofproto.OFPCML_NO_BUFFER)] self.add_flow(datapath,0,0,match,actions,"default flow entry") @set_ev_cls(ofp_event.EventOFPPacketIn,MAIN_DISPATCHER) def packet_in_handler(self,ev): bandwidth = self.get_sflow_bandwidth() if bandwidth==None: bandwidth = 0 msg = ev.msg datapath = msg.datapath ofproto = datapath.ofproto parser = datapath.ofproto_parser in_port = msg.match['in_port'] pkt = packet.Packet(msg.data) #按协议嵌套显示数据包内容 #for p in pkt.protocols: # print(p) eth_pkt = pkt.get_protocol(ethernet.ethernet) ip_pkt = pkt.get_protocol(ipv4.ipv4) ipv6_pkt = pkt.get_protocol(ipv6.ipv6) icmp_pkt = pkt.get_protocol(icmp.icmp) arp_pkt = pkt.get_protocol(arp.arp) icmpv6_pkt = pkt.get_protocol(icmpv6.icmpv6) tcp_pkt = pkt.get_protocol(tcp.tcp) udp_pkt = pkt.get_protocol(udp.udp) if eth_pkt.ethertype == ether_types.ETH_TYPE_LLDP: return # ignore lldp packet #源/目的MAC src = eth_pkt.src dst = eth_pkt.dst if ip_pkt: s_ip = ip_pkt.src d_ip = ip_pkt.dst protocol = "IP" #分类器不能处理ipv6,icmp包,所以直接忽略 if icmp_pkt: return if icmpv6_pkt: return if ipv6_pkt: return if arp_pkt: return if tcp_pkt: protocol = 'TCP' s_port = tcp_pkt.src_port d_port = tcp_pkt.dst_port elif udp_pkt: protocol = 'UDP' s_port = udp_pkt.src_port d_port = udp_pkt.dst_port else: return dpid = datapath.id self.mac_to_port.setdefault(dpid, {}) #self.logger.info("packet in %s %s %s %s", dpid, src, dst, in_port) #输入分类器判断 s_msg = str(msg) s_pkt = str(pkt) input_p = PacketData(s_msg, s_pkt) pre_out = int(input_p.predict(self.model_list)) print("is malware????") print(pre_out) type_out = pre_out+9 #如果是IP及上层协议包,match字段用源/目的ip,仅是ethernet,就用MAC(match字段可以加) if ip_pkt: kwargs = dict(in_port=in_port, eth_type=ether_types.ETH_TYPE_IP,ipv4_src=s_ip,ipv4_dst=d_ip) else: kwargs = dict(in_port=in_port, eth_src=src, eth_dst=dst) match = parser.OFPMatch(**kwargs) #供界面使用的数据 ''' store_data = { "sIP": s_ip, "dIP": d_ip, "sPort": s_port, "dPort": d_port, "protocol": protocol, "sMac": src, "dMac": dst, "type": type_out, } #print(store_data) redis_store("queue_test",store_data) ''' conn = redis.Redis(connection_pool=POOL) conn.set("first",bandwidth) out_port = 1 #如果是正常包 #if pre_out == 0 or pre_out == 3: if pre_out == 0: ''' self.mac_to_port[dpid][src] = in_port if dst in self.mac_to_port[dpid]: out_port = self.mac_to_port[dpid][dst] else: #out_port = ofproto.OFPP_FLOOD out_port = 3 ''' input_benign = BenignPacketData(s_msg) benign_out = input_benign.predict() print("what's app??????") print(benign_out) type_out = int(benign_out) if conn.exists("app")&conn.exists("qos"): QoS_app = int(conn.get("app")) QoS_limit = int(conn.get("qos")) if benign_out == QoS_app: #如果当前流量是用户指定需要进行QoS的应用 if bandwidth >= QoS_limit: out_port = 4 else: out_port = 3 else: out_port = 3 else: out_port = 3 actions = [parser.OFPActionOutput(out_port)] #满二十次给交换机下发流表,之后的数据包就不会再上送 if pre_out == 3: self.add_flow(datapath, 2, 1800, match, actions, "new flow") #向界面返回实施流量数据 store_data = { "sIP": s_ip, "dIP": d_ip, "sPort": s_port, "dPort": d_port, "protocol": protocol, "sMac": src, "dMac": dst, "type": type_out, } redis_store("queue_test",store_data) if out_port == 3: redis_store("s3",store_data) elif out_port == 4: redis_store("s4",store_data) #恶意包,分类器恶意包的规则是,一旦判断为恶意的,之后都认为是恶意的,不会判断20次 #if pre_out == 0 or pre_out == 2: if pre_out == 1: self.mac_to_port[dpid][src] = in_port out_port = ofproto.OFPIT_CLEAR_ACTIONS actions = [parser.OFPActionOutput(out_port)] self.add_flow(datapath, 3, 3600, match, actions, "drop") #通知交换机执行动作 data = None if msg.buffer_id == ofproto.OFP_NO_BUFFER: data = msg.data out = parser.OFPPacketOut(datapath=datapath, buffer_id=msg.buffer_id, in_port=in_port, actions=actions, data=data) datapath.send_msg(out)