1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
|
from ryu.base import app_manager
from ryu.ofproto import ofproto_v1_3
from ryu.controller import ofp_event
from ryu.controller.handler import MAIN_DISPATCHER,CONFIG_DISPATCHER
from ryu.controller.handler import set_ev_cls
from ryu.lib.packet import packet
from ryu.lib.packet import ethernet
from ryu.lib.packet import ether_types
from ryu.lib.packet import tcp,ipv4,udp,icmp,icmpv6,ipv6,arp
from ryu.lib.packet import in_proto as inet
from model.packet_predict import PacketData
from model.benign_packetpredict import BenignPacketData
from ryu.lib import pcaplib
import csv
import redis
from product import redis_store
from util.redis_pool import POOL
import http.client
import urllib.request
import re
import json
from threading import Timer
import joblib
class SimpleSwitch(app_manager.RyuApp):
OFP_VERSIONS = [ofproto_v1_3.OFP_VERSION]
def __init__(self,*args,**kwargs):
super(SimpleSwitch,self).__init__(*args,**kwargs)
self.mac_to_port = {}
#self.switch_ip = input('输入Mininet主机的ip地址:')
#self.port_id = input('输入交换机s1-eth0在sflow的编号:')
self.model_list=[]
model_namelist = ["DT.model","RF.model","LightGBM.model"]
model_dir = "model/"
for model_name in model_namelist:
model = joblib.load(model_dir+model_name)
self.model_list.append(model)
def add_flow(self,datapath,priority,hard_timeout,match,actions,remind_content):
ofproto = datapath.ofproto
ofp_parser = datapath.ofproto_parser
inst = [ofp_parser.OFPInstructionActions(ofproto.OFPIT_APPLY_ACTIONS,
actions)]
mod = ofp_parser.OFPFlowMod(datapath=datapath,priority=priority,hard_timeout=hard_timeout,
match=match,instructions=inst)
print("install to datapath,"+remind_content)
datapath.send_msg(mod)
#监控当前链路流速
def get_sflow_bandwidth(self):
port_id = '3'
switch_ip = '10.0.1.10'
port = port_id + '.ifinpkts'
url = "http://127.0.0.1:8008/metric/"+ switch_ip + '/' + port+'/json'
#http://127.0.0.1:8008/metric/10.0.1.10/3.ifinpkts/json
sflow_data = urllib.request.urlopen(url)
data = sflow_data.read()
if data:
value_dict = json.loads(data)[0]
bandwidth = value_dict.get('metricValue')
print('当前数据包速率是:%s' % bandwidth)
return bandwidth
else:
print('当前数据包速率是:none' )
#t = Timer(5,flow_change)
#t.start()
@set_ev_cls(ofp_event.EventOFPSwitchFeatures,CONFIG_DISPATCHER)
def switch_features_handler(self,ev):
datapath = ev.msg.datapath
ofproto = datapath.ofproto
ofp_parser = datapath.ofproto_parser
#交换机连上的时候,下发一条默认流表,没有匹配到其他流表的都上送到交换机
match = ofp_parser.OFPMatch()
actions = [ofp_parser.OFPActionOutput(ofproto.OFPP_CONTROLLER,ofproto.OFPCML_NO_BUFFER)]
self.add_flow(datapath,0,0,match,actions,"default flow entry")
@set_ev_cls(ofp_event.EventOFPPacketIn,MAIN_DISPATCHER)
def packet_in_handler(self,ev):
bandwidth = self.get_sflow_bandwidth()
msg = ev.msg
datapath = msg.datapath
ofproto = datapath.ofproto
parser = datapath.ofproto_parser
in_port = msg.match['in_port']
pkt = packet.Packet(msg.data)
#按协议嵌套显示数据包内容
#for p in pkt.protocols:
# print(p)
eth_pkt = pkt.get_protocol(ethernet.ethernet)
ip_pkt = pkt.get_protocol(ipv4.ipv4)
ipv6_pkt = pkt.get_protocol(ipv6.ipv6)
icmp_pkt = pkt.get_protocol(icmp.icmp)
arp_pkt = pkt.get_protocol(arp.arp)
icmpv6_pkt = pkt.get_protocol(icmpv6.icmpv6)
tcp_pkt = pkt.get_protocol(tcp.tcp)
udp_pkt = pkt.get_protocol(udp.udp)
if eth_pkt.ethertype == ether_types.ETH_TYPE_LLDP:
return # ignore lldp packet
#源/目的MAC
src = eth_pkt.src
dst = eth_pkt.dst
if ip_pkt:
s_ip = ip_pkt.src
d_ip = ip_pkt.dst
protocol = "IP"
#分类器不能处理ipv6,icmp包,所以直接忽略
if icmp_pkt:
return
if icmpv6_pkt:
return
if ipv6_pkt:
return
if arp_pkt:
return
if tcp_pkt:
protocol = 'TCP'
s_port = tcp_pkt.src_port
d_port = tcp_pkt.dst_port
elif udp_pkt:
protocol = 'UDP'
s_port = udp_pkt.src_port
d_port = udp_pkt.dst_port
else:
return
dpid = datapath.id
self.mac_to_port.setdefault(dpid, {})
#self.logger.info("packet in %s %s %s %s", dpid, src, dst, in_port)
#输入分类器判断
s_msg = str(msg)
s_pkt = str(pkt)
input_p = PacketData(s_msg, s_pkt)
pre_out = int(input_p.predict(self.model_list))
print("is malware????")
print(pre_out)
type_out = pre_out+9
#如果是IP及上层协议包,match字段用源/目的ip,仅是ethernet,就用MAC(match字段可以加)
if ip_pkt:
kwargs = dict(in_port=in_port, eth_type=ether_types.ETH_TYPE_IP,ipv4_src=s_ip,ipv4_dst=d_ip)
else:
kwargs = dict(in_port=in_port, eth_src=src, eth_dst=dst)
match = parser.OFPMatch(**kwargs)
#供界面使用的数据
'''
store_data = {
"sIP": s_ip,
"dIP": d_ip,
"sPort": s_port,
"dPort": d_port,
"protocol": protocol,
"sMac": src,
"dMac": dst,
"type": type_out,
}
#print(store_data)
redis_store("queue_test",store_data)
'''
conn = redis.Redis(connection_pool=POOL)
conn.set("first",bandwidth)
#如果是正常包
#if pre_out == 0 or pre_out == 3:
if pre_out == 0:
'''
self.mac_to_port[dpid][src] = in_port
if dst in self.mac_to_port[dpid]:
out_port = self.mac_to_port[dpid][dst]
else:
#out_port = ofproto.OFPP_FLOOD
out_port = 3
'''
input_benign = BenignPacketData(s_msg)
benign_out = input_benign.predict()
print("what's app??????")
print(benign_out)
type_out = int(benign_out)
if conn.exists("app")&conn.exists("qos"):
QoS_app = int(conn.get("app"))
QoS_limit = int(conn.get("qos"))
if benign_out == QoS_app: #如果当前流量是用户指定需要进行QoS的应用
if bandwidth >= QoS_limit:
out_port = 4
else:
out_port = 3
else:
out_port = 3
else:
out_port = 3
actions = [parser.OFPActionOutput(out_port)]
#满二十次给交换机下发流表,之后的数据包就不会再上送
if pre_out == 3:
self.add_flow(datapath, 2, 1800, match, actions, "new flow")
#向界面返回实施流量数据
store_data = {
"sIP": s_ip,
"dIP": d_ip,
"sPort": s_port,
"dPort": d_port,
"protocol": protocol,
"sMac": src,
"dMac": dst,
"type": type_out,
}
redis_store("queue_test",store_data)
if out_port == 3:
redis_store("s3",store_data)
elif out_port == 4:
redis_store("s4",store_data)
#恶意包,分类器恶意包的规则是,一旦判断为恶意的,之后都认为是恶意的,不会判断20次
#if pre_out == 0 or pre_out == 2:
if pre_out == 1:
self.mac_to_port[dpid][src] = in_port
out_port = ofproto.OFPIT_CLEAR_ACTIONS
actions = [parser.OFPActionOutput(out_port)]
self.add_flow(datapath, 3, 3600, match, actions, "drop")
#通知交换机执行动作
data = None
if msg.buffer_id == ofproto.OFP_NO_BUFFER:
data = msg.data
out = parser.OFPPacketOut(datapath=datapath, buffer_id=msg.buffer_id,
in_port=in_port, actions=actions, data=data)
datapath.send_msg(out)
|