diff options
Diffstat (limited to 'sflow-rt/extras')
| -rw-r--r-- | sflow-rt/extras/README | 14 | ||||
| -rw-r--r-- | sflow-rt/extras/sflow.py | 96 | ||||
| -rwxr-xr-x | sflow-rt/extras/tail_flows.py | 20 | ||||
| -rwxr-xr-x | sflow-rt/extras/tail_log.py | 20 | ||||
| -rwxr-xr-x | sflow-rt/extras/topflows.py | 114 |
5 files changed, 264 insertions, 0 deletions
diff --git a/sflow-rt/extras/README b/sflow-rt/extras/README new file mode 100644 index 0000000..a57fd00 --- /dev/null +++ b/sflow-rt/extras/README @@ -0,0 +1,14 @@ +The following Python scripts demonstrating sFlow-RT's REST API: + +* tail_log.py - tail the event log +* tail_flows.py - tail flow log +* topflows.py - text based client displaying top flows + +Note: The Python scripts make use of the Python requests library: + http://python-requests.org/ + +The following script extends Mininet to automatically enable sFlow: + +* sflow.py - send sFlow and topology to local sFlow-RT instance + +See http://blog.sflow.com/2016/05/mininet-flow-analytics.html diff --git a/sflow-rt/extras/sflow.py b/sflow-rt/extras/sflow.py new file mode 100644 index 0000000..4941093 --- /dev/null +++ b/sflow-rt/extras/sflow.py @@ -0,0 +1,96 @@ +from mininet.net import Mininet +from mininet.util import quietRun +from requests import put +from os import listdir, environ +import re +import socket +import fcntl +import array +import struct +import sys + +def wrapper(fn): + + def getIfInfo(dst): + is_64bits = sys.maxsize > 2**32 + struct_size = 40 if is_64bits else 32 + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + max_possible = 8 # initial value + while True: + bytes = max_possible * struct_size + names = array.array('B') + for i in range(0, bytes): + names.append(0) + outbytes = struct.unpack('iL', fcntl.ioctl( + s.fileno(), + 0x8912, # SIOCGIFCONF + struct.pack('iL', bytes, names.buffer_info()[0]) + ))[0] + if outbytes == bytes: + max_possible *= 2 + else: + break + try: + namestr = names.tobytes() + namestr = namestr.decode('utf-8') + except AttributeError: + namestr = names.tostring() + s.connect((dst, 0)) + ip = s.getsockname()[0] + for i in range(0, outbytes, struct_size): + name = namestr[i:i+16].split('\0', 1)[0] + addr = socket.inet_ntoa(namestr[i+20:i+24].encode('utf-8')) + if addr == ip: + return (name,addr) + + def configSFlow(net,collector,ifname,sampling,polling): + print("*** Enabling sFlow:") + sflow = 'ovs-vsctl -- --id=@sflow create sflow agent=%s target=%s sampling=%s polling=%s --' % (ifname,collector,sampling,polling) + for s in net.switches: + sflow += ' -- set bridge %s sflow=@sflow' % s + print(' '.join([s.name for s in net.switches])) + quietRun(sflow) + + def sendTopology(net,agent,collector): + print("*** Sending topology") + topo = {'nodes':{}, 'links':{}} + for s in net.switches: + topo['nodes'][s.name] = {'agent':agent, 'ports':{}} + path = '/sys/devices/virtual/net/' + for child in listdir(path): + parts = re.match('(^.+)-(.+)', child) + if parts == None: continue + if parts.group(1) in topo['nodes']: + ifindex = open(path+child+'/ifindex').read().split('\n',1)[0] + topo['nodes'][parts.group(1)]['ports'][child] = {'ifindex': ifindex} + i = 0 + for s1 in net.switches: + j = 0 + for s2 in net.switches: + if j > i: + intfs = s1.connectionsTo(s2) + for intf in intfs: + s1ifIdx = topo['nodes'][s1.name]['ports'][intf[0].name]['ifindex'] + s2ifIdx = topo['nodes'][s2.name]['ports'][intf[1].name]['ifindex'] + linkName = '%s-%s' % (s1.name, s2.name) + topo['links'][linkName] = {'node1': s1.name, 'port1': intf[0].name, 'node2': s2.name, 'port2': intf[1].name} + j += 1 + i += 1 + + put('http://%s:8008/topology/json' % collector, json=topo) + + def result(*args,**kwargs): + res = fn(*args,**kwargs) + net = args[0] + collector = environ.get('COLLECTOR','127.0.0.1') + sampling = environ.get('SAMPLING','10') + polling = environ.get('POLLING','10') + (ifname, agent) = getIfInfo(collector) + configSFlow(net,collector,ifname,sampling,polling) + sendTopology(net,agent,collector) + return res + + return result + +setattr(Mininet, 'start', wrapper(Mininet.__dict__['start'])) + diff --git a/sflow-rt/extras/tail_flows.py b/sflow-rt/extras/tail_flows.py new file mode 100755 index 0000000..faf6213 --- /dev/null +++ b/sflow-rt/extras/tail_flows.py @@ -0,0 +1,20 @@ +#!/usr/bin/env python +import requests +import signal + +def sig_handler(signal,frame): + exit(0) +signal.signal(signal.SIGINT, sig_handler) + +flowurl = 'http://localhost:8008/flows/json?maxFlows=10&timeout=60' +flowID = -1 +while 1 == 1: + r = requests.get(flowurl + "&flowID=" + str(flowID)) + if r.status_code != 200: break + flows = r.json() + if len(flows) == 0: continue + + flowID = flows[0]["flowID"] + flows.reverse() + for f in flows: + print(str(f['flowID']) + ',' + f['name'] + ',' + f['flowKeys'] + ',' + str(f['value']) + ',' + str(f['start']) + ',' + str(f['end']) + ',' + f['agent'] + ',' + str(f['dataSource'])) diff --git a/sflow-rt/extras/tail_log.py b/sflow-rt/extras/tail_log.py new file mode 100755 index 0000000..7d29efb --- /dev/null +++ b/sflow-rt/extras/tail_log.py @@ -0,0 +1,20 @@ +#!/usr/bin/env python +import requests +import signal + +def sig_handler(signal,frame): + exit(0) +signal.signal(signal.SIGINT, sig_handler) + +eventurl = 'http://localhost:8008/events/json?maxEvents=10&timeout=60' +eventID = -1 +while 1 == 1: + r = requests.get(eventurl + "&eventID=" + str(eventID)) + if r.status_code != 200: break + events = r.json() + if len(events) == 0: continue + + eventID = events[0]["eventID"] + events.reverse() + for e in events: + print(str(e['eventID']) + ',' + str(e['timestamp']) + ',' + e['thresholdID'] + ',' + e['metric'] + ',' + str(e['threshold']) + ',' + str(e['value']) + ',' + e['agent'] + ',' + e['dataSource']) diff --git a/sflow-rt/extras/topflows.py b/sflow-rt/extras/topflows.py new file mode 100755 index 0000000..ee234c8 --- /dev/null +++ b/sflow-rt/extras/topflows.py @@ -0,0 +1,114 @@ +#!/usr/bin/env python +import sys +import requests +import signal +import curses +import time +import math + +def eng_str( x, format='%s', si=False): + sign = '' + if x < 0: + x = -x + sign = '-' + exp = int( math.floor( math.log10( x))) + exp3 = exp - ( exp % 3) + x3 = x / float( 10 ** exp3) + + if si and exp3 >= -24 and exp3 <= 24 and exp3 != 0: + exp3_text = 'yzafpnum KMGTPEZY'[ ( exp3 - (-24)) / 3] + elif exp3 == 0: + exp3_text = '' + else: + exp3_text = 'e%s' % exp3 + + return ( '%s'+format+'%s') % ( sign, x3, exp3_text) + +def endSession(): + curses.nocbreak(); stdscr.keypad(0); curses.echo() + curses.endwin() + +def sig_handler(signal,frame): + endSession() + exit(0) +signal.signal(signal.SIGINT, sig_handler) + +if __name__ == '__main__': + import optparse + import json + import os.path + parser = optparse.OptionParser() + parser.add_option("", "--flow", dest="flow", default="flows", + help="name of sFlow-RT flow defintion") + parser.add_option("", "--server", dest="server", default="localhost", + help="name or IP address of sFlow-RT server") + (options,args) = parser.parse_args() + +specurl = 'http://' + options.server + ':8008/flow/' + options.flow + '/json' + +r = requests.get(specurl) +if r.status_code != 200: + print('Cannot retrieve flow definition for ' + options.flow) + sys.exit(1) +spec = r.json() +keyfields = str(spec['keys']).split(',') +valuefield = str(spec['value']) +fieldsep = str(spec['fs']) + +stdscr = curses.initscr() +curses.noecho() +curses.cbreak() +stdscr.keypad(1) +stdscr.nodelay(1) +pad = None + +try: + while True: + + ch = -1 + ch = stdscr.getch() + if ch == ord('q'): break + if ch == ord('Q'): break + if ch == curses.KEY_RESIZE or pad is None: + (maxY,maxX) = stdscr.getmaxyx() + cw = maxX / (len(keyfields) + 1) + flowsurl = 'http://' + options.server + ':8008/activeflows/ALL/' + options.flow + '/json?maxFlows=' + str(maxY - 2) + pad = curses.newpad(maxY,maxX) + + # get latest flow data + r = requests.get(flowsurl) + if r.status_code != 200: break + flows = r.json() + if len(flows) == 0: continue + + # write headers + pad.clear() + for h in range(0,len(keyfields)): + pad.addstr(0,h * cw,format(keyfields[h],"<"+str(cw)),curses.A_STANDOUT) + pad.addstr(0,len(keyfields)*cw,format(valuefield,">"+str(cw)), curses.A_STANDOUT) + + # write rows + for r in range(0, len(flows)): + keys = flows[r]['key'].split(',') + value = int(flows[r]['value']) + if value == 0: continue + + for c in range(0,len(keys)): + pad.addstr(1+r,c * cw,format(keys[c],"<"+str(cw))) + # pad.addstr(1+r,len(keys)*cw,format(value,">"+str(cw)+".6g")) + pad.addstr(1+r,len(keys)*cw,format(eng_str(value,"%.3f",True),">"+str(cw))) + + # sync to screen - may fail during resize + try: pad.refresh(0,0,0,0,maxY,maxX) + except: pass + + # sleep may be interrupted - e.g. during resize + # so put this in a loop to make sure we don't + # thrash and send too many requests + wake = time.time() + 1.9 + while True: + time.sleep(2) + if time.time() >= wake: break +finally: + endSession() + |
