summaryrefslogtreecommitdiff
path: root/sflow-rt/extras
diff options
context:
space:
mode:
Diffstat (limited to 'sflow-rt/extras')
-rw-r--r--sflow-rt/extras/README14
-rw-r--r--sflow-rt/extras/sflow.py96
-rwxr-xr-xsflow-rt/extras/tail_flows.py20
-rwxr-xr-xsflow-rt/extras/tail_log.py20
-rwxr-xr-xsflow-rt/extras/topflows.py114
5 files changed, 264 insertions, 0 deletions
diff --git a/sflow-rt/extras/README b/sflow-rt/extras/README
new file mode 100644
index 0000000..a57fd00
--- /dev/null
+++ b/sflow-rt/extras/README
@@ -0,0 +1,14 @@
+The following Python scripts demonstrating sFlow-RT's REST API:
+
+* tail_log.py - tail the event log
+* tail_flows.py - tail flow log
+* topflows.py - text based client displaying top flows
+
+Note: The Python scripts make use of the Python requests library:
+ http://python-requests.org/
+
+The following script extends Mininet to automatically enable sFlow:
+
+* sflow.py - send sFlow and topology to local sFlow-RT instance
+
+See http://blog.sflow.com/2016/05/mininet-flow-analytics.html
diff --git a/sflow-rt/extras/sflow.py b/sflow-rt/extras/sflow.py
new file mode 100644
index 0000000..4941093
--- /dev/null
+++ b/sflow-rt/extras/sflow.py
@@ -0,0 +1,96 @@
+from mininet.net import Mininet
+from mininet.util import quietRun
+from requests import put
+from os import listdir, environ
+import re
+import socket
+import fcntl
+import array
+import struct
+import sys
+
+def wrapper(fn):
+
+ def getIfInfo(dst):
+ is_64bits = sys.maxsize > 2**32
+ struct_size = 40 if is_64bits else 32
+ s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+ max_possible = 8 # initial value
+ while True:
+ bytes = max_possible * struct_size
+ names = array.array('B')
+ for i in range(0, bytes):
+ names.append(0)
+ outbytes = struct.unpack('iL', fcntl.ioctl(
+ s.fileno(),
+ 0x8912, # SIOCGIFCONF
+ struct.pack('iL', bytes, names.buffer_info()[0])
+ ))[0]
+ if outbytes == bytes:
+ max_possible *= 2
+ else:
+ break
+ try:
+ namestr = names.tobytes()
+ namestr = namestr.decode('utf-8')
+ except AttributeError:
+ namestr = names.tostring()
+ s.connect((dst, 0))
+ ip = s.getsockname()[0]
+ for i in range(0, outbytes, struct_size):
+ name = namestr[i:i+16].split('\0', 1)[0]
+ addr = socket.inet_ntoa(namestr[i+20:i+24].encode('utf-8'))
+ if addr == ip:
+ return (name,addr)
+
+ def configSFlow(net,collector,ifname,sampling,polling):
+ print("*** Enabling sFlow:")
+ sflow = 'ovs-vsctl -- --id=@sflow create sflow agent=%s target=%s sampling=%s polling=%s --' % (ifname,collector,sampling,polling)
+ for s in net.switches:
+ sflow += ' -- set bridge %s sflow=@sflow' % s
+ print(' '.join([s.name for s in net.switches]))
+ quietRun(sflow)
+
+ def sendTopology(net,agent,collector):
+ print("*** Sending topology")
+ topo = {'nodes':{}, 'links':{}}
+ for s in net.switches:
+ topo['nodes'][s.name] = {'agent':agent, 'ports':{}}
+ path = '/sys/devices/virtual/net/'
+ for child in listdir(path):
+ parts = re.match('(^.+)-(.+)', child)
+ if parts == None: continue
+ if parts.group(1) in topo['nodes']:
+ ifindex = open(path+child+'/ifindex').read().split('\n',1)[0]
+ topo['nodes'][parts.group(1)]['ports'][child] = {'ifindex': ifindex}
+ i = 0
+ for s1 in net.switches:
+ j = 0
+ for s2 in net.switches:
+ if j > i:
+ intfs = s1.connectionsTo(s2)
+ for intf in intfs:
+ s1ifIdx = topo['nodes'][s1.name]['ports'][intf[0].name]['ifindex']
+ s2ifIdx = topo['nodes'][s2.name]['ports'][intf[1].name]['ifindex']
+ linkName = '%s-%s' % (s1.name, s2.name)
+ topo['links'][linkName] = {'node1': s1.name, 'port1': intf[0].name, 'node2': s2.name, 'port2': intf[1].name}
+ j += 1
+ i += 1
+
+ put('http://%s:8008/topology/json' % collector, json=topo)
+
+ def result(*args,**kwargs):
+ res = fn(*args,**kwargs)
+ net = args[0]
+ collector = environ.get('COLLECTOR','127.0.0.1')
+ sampling = environ.get('SAMPLING','10')
+ polling = environ.get('POLLING','10')
+ (ifname, agent) = getIfInfo(collector)
+ configSFlow(net,collector,ifname,sampling,polling)
+ sendTopology(net,agent,collector)
+ return res
+
+ return result
+
+setattr(Mininet, 'start', wrapper(Mininet.__dict__['start']))
+
diff --git a/sflow-rt/extras/tail_flows.py b/sflow-rt/extras/tail_flows.py
new file mode 100755
index 0000000..faf6213
--- /dev/null
+++ b/sflow-rt/extras/tail_flows.py
@@ -0,0 +1,20 @@
+#!/usr/bin/env python
+import requests
+import signal
+
+def sig_handler(signal,frame):
+ exit(0)
+signal.signal(signal.SIGINT, sig_handler)
+
+flowurl = 'http://localhost:8008/flows/json?maxFlows=10&timeout=60'
+flowID = -1
+while 1 == 1:
+ r = requests.get(flowurl + "&flowID=" + str(flowID))
+ if r.status_code != 200: break
+ flows = r.json()
+ if len(flows) == 0: continue
+
+ flowID = flows[0]["flowID"]
+ flows.reverse()
+ for f in flows:
+ print(str(f['flowID']) + ',' + f['name'] + ',' + f['flowKeys'] + ',' + str(f['value']) + ',' + str(f['start']) + ',' + str(f['end']) + ',' + f['agent'] + ',' + str(f['dataSource']))
diff --git a/sflow-rt/extras/tail_log.py b/sflow-rt/extras/tail_log.py
new file mode 100755
index 0000000..7d29efb
--- /dev/null
+++ b/sflow-rt/extras/tail_log.py
@@ -0,0 +1,20 @@
+#!/usr/bin/env python
+import requests
+import signal
+
+def sig_handler(signal,frame):
+ exit(0)
+signal.signal(signal.SIGINT, sig_handler)
+
+eventurl = 'http://localhost:8008/events/json?maxEvents=10&timeout=60'
+eventID = -1
+while 1 == 1:
+ r = requests.get(eventurl + "&eventID=" + str(eventID))
+ if r.status_code != 200: break
+ events = r.json()
+ if len(events) == 0: continue
+
+ eventID = events[0]["eventID"]
+ events.reverse()
+ for e in events:
+ print(str(e['eventID']) + ',' + str(e['timestamp']) + ',' + e['thresholdID'] + ',' + e['metric'] + ',' + str(e['threshold']) + ',' + str(e['value']) + ',' + e['agent'] + ',' + e['dataSource'])
diff --git a/sflow-rt/extras/topflows.py b/sflow-rt/extras/topflows.py
new file mode 100755
index 0000000..ee234c8
--- /dev/null
+++ b/sflow-rt/extras/topflows.py
@@ -0,0 +1,114 @@
+#!/usr/bin/env python
+import sys
+import requests
+import signal
+import curses
+import time
+import math
+
+def eng_str( x, format='%s', si=False):
+ sign = ''
+ if x < 0:
+ x = -x
+ sign = '-'
+ exp = int( math.floor( math.log10( x)))
+ exp3 = exp - ( exp % 3)
+ x3 = x / float( 10 ** exp3)
+
+ if si and exp3 >= -24 and exp3 <= 24 and exp3 != 0:
+ exp3_text = 'yzafpnum KMGTPEZY'[ ( exp3 - (-24)) / 3]
+ elif exp3 == 0:
+ exp3_text = ''
+ else:
+ exp3_text = 'e%s' % exp3
+
+ return ( '%s'+format+'%s') % ( sign, x3, exp3_text)
+
+def endSession():
+ curses.nocbreak(); stdscr.keypad(0); curses.echo()
+ curses.endwin()
+
+def sig_handler(signal,frame):
+ endSession()
+ exit(0)
+signal.signal(signal.SIGINT, sig_handler)
+
+if __name__ == '__main__':
+ import optparse
+ import json
+ import os.path
+ parser = optparse.OptionParser()
+ parser.add_option("", "--flow", dest="flow", default="flows",
+ help="name of sFlow-RT flow defintion")
+ parser.add_option("", "--server", dest="server", default="localhost",
+ help="name or IP address of sFlow-RT server")
+ (options,args) = parser.parse_args()
+
+specurl = 'http://' + options.server + ':8008/flow/' + options.flow + '/json'
+
+r = requests.get(specurl)
+if r.status_code != 200:
+ print('Cannot retrieve flow definition for ' + options.flow)
+ sys.exit(1)
+spec = r.json()
+keyfields = str(spec['keys']).split(',')
+valuefield = str(spec['value'])
+fieldsep = str(spec['fs'])
+
+stdscr = curses.initscr()
+curses.noecho()
+curses.cbreak()
+stdscr.keypad(1)
+stdscr.nodelay(1)
+pad = None
+
+try:
+ while True:
+
+ ch = -1
+ ch = stdscr.getch()
+ if ch == ord('q'): break
+ if ch == ord('Q'): break
+ if ch == curses.KEY_RESIZE or pad is None:
+ (maxY,maxX) = stdscr.getmaxyx()
+ cw = maxX / (len(keyfields) + 1)
+ flowsurl = 'http://' + options.server + ':8008/activeflows/ALL/' + options.flow + '/json?maxFlows=' + str(maxY - 2)
+ pad = curses.newpad(maxY,maxX)
+
+ # get latest flow data
+ r = requests.get(flowsurl)
+ if r.status_code != 200: break
+ flows = r.json()
+ if len(flows) == 0: continue
+
+ # write headers
+ pad.clear()
+ for h in range(0,len(keyfields)):
+ pad.addstr(0,h * cw,format(keyfields[h],"<"+str(cw)),curses.A_STANDOUT)
+ pad.addstr(0,len(keyfields)*cw,format(valuefield,">"+str(cw)), curses.A_STANDOUT)
+
+ # write rows
+ for r in range(0, len(flows)):
+ keys = flows[r]['key'].split(',')
+ value = int(flows[r]['value'])
+ if value == 0: continue
+
+ for c in range(0,len(keys)):
+ pad.addstr(1+r,c * cw,format(keys[c],"<"+str(cw)))
+ # pad.addstr(1+r,len(keys)*cw,format(value,">"+str(cw)+".6g"))
+ pad.addstr(1+r,len(keys)*cw,format(eng_str(value,"%.3f",True),">"+str(cw)))
+
+ # sync to screen - may fail during resize
+ try: pad.refresh(0,0,0,0,maxY,maxX)
+ except: pass
+
+ # sleep may be interrupted - e.g. during resize
+ # so put this in a loop to make sure we don't
+ # thrash and send too many requests
+ wake = time.time() + 1.9
+ while True:
+ time.sleep(2)
+ if time.time() >= wake: break
+finally:
+ endSession()
+