summaryrefslogtreecommitdiff
path: root/ryu_predict.py
diff options
context:
space:
mode:
Diffstat (limited to 'ryu_predict.py')
-rw-r--r--ryu_predict.py249
1 files changed, 249 insertions, 0 deletions
diff --git a/ryu_predict.py b/ryu_predict.py
new file mode 100644
index 0000000..11ee27d
--- /dev/null
+++ b/ryu_predict.py
@@ -0,0 +1,249 @@
+from ryu.base import app_manager
+from ryu.ofproto import ofproto_v1_3
+from ryu.controller import ofp_event
+from ryu.controller.handler import MAIN_DISPATCHER,CONFIG_DISPATCHER
+from ryu.controller.handler import set_ev_cls
+from ryu.lib.packet import packet
+from ryu.lib.packet import ethernet
+from ryu.lib.packet import ether_types
+from ryu.lib.packet import tcp,ipv4,udp,icmp,icmpv6,ipv6,arp
+from ryu.lib.packet import in_proto as inet
+from model.packet_predict import PacketData
+from model.benign_packetpredict import BenignPacketData
+from ryu.lib import pcaplib
+import csv
+import redis
+from product import redis_store
+from util.redis_pool import POOL
+import http.client
+import urllib.request
+import re
+import json
+from threading import Timer
+import joblib
+
+
+class SimpleSwitch(app_manager.RyuApp):
+ OFP_VERSIONS = [ofproto_v1_3.OFP_VERSION]
+
+ def __init__(self,*args,**kwargs):
+ super(SimpleSwitch,self).__init__(*args,**kwargs)
+ self.mac_to_port = {}
+ #self.switch_ip = input('输入Mininet主机的ip地址:')
+ #self.port_id = input('输入交换机s1-eth0在sflow的编号:')
+ self.model_list=[]
+ model_namelist = ["DT.model","RF.model","LightGBM.model"]
+ model_dir = "model/"
+ for model_name in model_namelist:
+ model = joblib.load(model_dir+model_name)
+ self.model_list.append(model)
+
+ def add_flow(self,datapath,priority,hard_timeout,match,actions,remind_content):
+ ofproto = datapath.ofproto
+ ofp_parser = datapath.ofproto_parser
+
+ inst = [ofp_parser.OFPInstructionActions(ofproto.OFPIT_APPLY_ACTIONS,
+ actions)]
+
+ mod = ofp_parser.OFPFlowMod(datapath=datapath,priority=priority,hard_timeout=hard_timeout,
+ match=match,instructions=inst)
+ print("install to datapath,"+remind_content)
+ datapath.send_msg(mod)
+
+ #监控当前链路流速
+ def get_sflow_bandwidth(self):
+ port_id = '3'
+ switch_ip = '10.0.1.10'
+ port = port_id + '.ifinpkts'
+ url = "http://127.0.0.1:8008/metric/"+ switch_ip + '/' + port+'/json'
+ #http://127.0.0.1:8008/metric/10.0.1.10/3.ifinpkts/json
+ sflow_data = urllib.request.urlopen(url)
+ data = sflow_data.read()
+ if data:
+ value_dict = json.loads(data)[0]
+ bandwidth = value_dict.get('metricValue')
+ print('当前数据包速率是:%s' % bandwidth)
+ return bandwidth
+ else:
+ print('当前数据包速率是:none' )
+
+ #t = Timer(5,flow_change)
+ #t.start()
+
+ @set_ev_cls(ofp_event.EventOFPSwitchFeatures,CONFIG_DISPATCHER)
+ def switch_features_handler(self,ev):
+ datapath = ev.msg.datapath
+ ofproto = datapath.ofproto
+ ofp_parser = datapath.ofproto_parser
+
+ #交换机连上的时候,下发一条默认流表,没有匹配到其他流表的都上送到交换机
+ match = ofp_parser.OFPMatch()
+ actions = [ofp_parser.OFPActionOutput(ofproto.OFPP_CONTROLLER,ofproto.OFPCML_NO_BUFFER)]
+ self.add_flow(datapath,0,0,match,actions,"default flow entry")
+
+ @set_ev_cls(ofp_event.EventOFPPacketIn,MAIN_DISPATCHER)
+ def packet_in_handler(self,ev):
+ bandwidth = self.get_sflow_bandwidth()
+ if bandwidth==None:
+ bandwidth = 0
+ msg = ev.msg
+ datapath = msg.datapath
+ ofproto = datapath.ofproto
+ parser = datapath.ofproto_parser
+ in_port = msg.match['in_port']
+
+ pkt = packet.Packet(msg.data)
+ #按协议嵌套显示数据包内容
+ #for p in pkt.protocols:
+ # print(p)
+ eth_pkt = pkt.get_protocol(ethernet.ethernet)
+ ip_pkt = pkt.get_protocol(ipv4.ipv4)
+ ipv6_pkt = pkt.get_protocol(ipv6.ipv6)
+ icmp_pkt = pkt.get_protocol(icmp.icmp)
+ arp_pkt = pkt.get_protocol(arp.arp)
+ icmpv6_pkt = pkt.get_protocol(icmpv6.icmpv6)
+ tcp_pkt = pkt.get_protocol(tcp.tcp)
+ udp_pkt = pkt.get_protocol(udp.udp)
+
+ if eth_pkt.ethertype == ether_types.ETH_TYPE_LLDP:
+ return # ignore lldp packet
+ #源/目的MAC
+ src = eth_pkt.src
+ dst = eth_pkt.dst
+
+ if ip_pkt:
+ s_ip = ip_pkt.src
+ d_ip = ip_pkt.dst
+ protocol = "IP"
+
+ #分类器不能处理ipv6,icmp包,所以直接忽略
+ if icmp_pkt:
+ return
+ if icmpv6_pkt:
+ return
+ if ipv6_pkt:
+ return
+ if arp_pkt:
+ return
+
+ if tcp_pkt:
+ protocol = 'TCP'
+ s_port = tcp_pkt.src_port
+ d_port = tcp_pkt.dst_port
+ elif udp_pkt:
+ protocol = 'UDP'
+ s_port = udp_pkt.src_port
+ d_port = udp_pkt.dst_port
+ else:
+ return
+
+ dpid = datapath.id
+ self.mac_to_port.setdefault(dpid, {})
+ #self.logger.info("packet in %s %s %s %s", dpid, src, dst, in_port)
+
+ #输入分类器判断
+ s_msg = str(msg)
+ s_pkt = str(pkt)
+ input_p = PacketData(s_msg, s_pkt)
+ pre_out = int(input_p.predict(self.model_list))
+ print("is malware????")
+ print(pre_out)
+
+ type_out = pre_out+9
+
+ #如果是IP及上层协议包,match字段用源/目的ip,仅是ethernet,就用MAC(match字段可以加)
+ if ip_pkt:
+ kwargs = dict(in_port=in_port, eth_type=ether_types.ETH_TYPE_IP,ipv4_src=s_ip,ipv4_dst=d_ip)
+ else:
+ kwargs = dict(in_port=in_port, eth_src=src, eth_dst=dst)
+ match = parser.OFPMatch(**kwargs)
+
+ #供界面使用的数据
+ '''
+ store_data = {
+ "sIP": s_ip,
+ "dIP": d_ip,
+ "sPort": s_port,
+ "dPort": d_port,
+ "protocol": protocol,
+ "sMac": src,
+ "dMac": dst,
+ "type": type_out,
+ }
+ #print(store_data)
+ redis_store("queue_test",store_data)
+ '''
+ conn = redis.Redis(connection_pool=POOL)
+ conn.set("first",bandwidth)
+
+ out_port = 1
+
+ #如果是正常包
+ #if pre_out == 0 or pre_out == 3:
+ if pre_out == 0:
+ '''
+ self.mac_to_port[dpid][src] = in_port
+ if dst in self.mac_to_port[dpid]:
+ out_port = self.mac_to_port[dpid][dst]
+ else:
+ #out_port = ofproto.OFPP_FLOOD
+ out_port = 3
+ '''
+ input_benign = BenignPacketData(s_msg)
+ benign_out = input_benign.predict()
+ print("what's app??????")
+ print(benign_out)
+ type_out = int(benign_out)
+
+ if conn.exists("app")&conn.exists("qos"):
+ QoS_app = int(conn.get("app"))
+ QoS_limit = int(conn.get("qos"))
+ if benign_out == QoS_app: #如果当前流量是用户指定需要进行QoS的应用
+ if bandwidth >= QoS_limit:
+ out_port = 4
+ else:
+ out_port = 3
+ else:
+ out_port = 3
+ else:
+ out_port = 3
+
+ actions = [parser.OFPActionOutput(out_port)]
+ #满二十次给交换机下发流表,之后的数据包就不会再上送
+ if pre_out == 3:
+ self.add_flow(datapath, 2, 1800, match, actions, "new flow")
+
+ #向界面返回实施流量数据
+ store_data = {
+ "sIP": s_ip,
+ "dIP": d_ip,
+ "sPort": s_port,
+ "dPort": d_port,
+ "protocol": protocol,
+ "sMac": src,
+ "dMac": dst,
+ "type": type_out,
+ }
+ redis_store("queue_test",store_data)
+ if out_port == 3:
+ redis_store("s3",store_data)
+ elif out_port == 4:
+ redis_store("s4",store_data)
+
+ #恶意包,分类器恶意包的规则是,一旦判断为恶意的,之后都认为是恶意的,不会判断20次
+ #if pre_out == 0 or pre_out == 2:
+ if pre_out == 1:
+ self.mac_to_port[dpid][src] = in_port
+ out_port = ofproto.OFPIT_CLEAR_ACTIONS
+ actions = [parser.OFPActionOutput(out_port)]
+ self.add_flow(datapath, 3, 3600, match, actions, "drop")
+
+ #通知交换机执行动作
+ data = None
+ if msg.buffer_id == ofproto.OFP_NO_BUFFER:
+ data = msg.data
+ out = parser.OFPPacketOut(datapath=datapath, buffer_id=msg.buffer_id,
+ in_port=in_port, actions=actions, data=data)
+ datapath.send_msg(out)
+
+