#!/usr/bin/env python # -*- coding: utf-8 -*- """ Use tcp ping host, just like ping comppand """ import socket import time import click import sys from collections import namedtuple from functools import partial from six.moves import zip_longest from six import print_ from timeit import default_timer as timer import numpy as np Statistics = namedtuple('Statistics', [ 'host', 'port', 'successed', 'failed', 'success_rate', 'minimum', 'maximum', 'average']) iprint = partial(print_, flush=True) def avg(x): return sum(x) / float(len(x)) class Socket(object): def __init__(self, family, type_, timeout): s = socket.socket(family, type_) s.settimeout(timeout) self._s = s def connect(self, host, port=80): self._s.connect((host, int(port))) def shutdown(self): self._s.shutdown(socket.SHUT_RD) def close(self): self._s.close() class Print(object): def __init__(self): self.table_field_names = [] self.rows = [] @property def raw(self): statistics_group = [] for row in self.rows: total = row.successed + row.failed statistics_header = '\n--- {}[:{}] tcping statistics ---'.format( row.host, row.port) statistics_body = '\n{} connections, {} successed, {} failed, {} success rate'.format( total, row.successed, row.failed, row.success_rate) statistics_footer = '\nminimum = {}, maximum = {}, average = {}'.format( row.minimum, row.maximum, row.average) statistics = statistics_header + statistics_body + statistics_footer statistics_group.append(statistics) return ''.join(statistics_group) def add_statistics(self, row): self.rows.append(row) class Timer(object): def __init__(self): self._start = 0 self._stop = 0 def start(self): self._start = timer() def stop(self): self._stop = timer() def cost(self, funcs, args): self.start() for func, arg in zip_longest(funcs, args): if arg: func(*arg) else: func() self.stop() return self._stop - self._start class Ping(object): def __init__(self, host, port=80, timeout=1): self.print_ = Print() self.timer = Timer() self._successed = 0 self._failed = 0 self._conn_times = [] self._host = host self._port = port self._timeout = timeout def _create_socket(self, family, type_): return Socket(family, type_, self._timeout) def _success_rate(self): count = self._successed + self._failed try: rate = (float(self._successed) / count) * 100 rate = '{0:.2f}'.format(rate) except ZeroDivisionError: rate = '0.00' return rate def statistics(self, n): conn_times = self._conn_times if self._conn_times != [] else [0] # minimum = '{0:.2f}ms'.format(min(conn_times)) # maximum = '{0:.2f}ms'.format(max(conn_times)) average = avg(conn_times) # success_rate = self._success_rate() + '%' return format(average,".3f") # self.print_.add_statistics(Statistics( # self._host, # self._port, # self._successed, # self._failed, # success_rate, # minimum, # maximum, # average)) @property def result(self): return self.print_ @property def status(self): return self._successed == 0 def ping(self, count=10): for n in range(1, count + 1): s = self._create_socket(socket.AF_INET, socket.SOCK_STREAM) try: time.sleep(1) cost_time = self.timer.cost( (s.connect, s.shutdown), ((self._host, self._port), None)) s_runtime = 1000 * (cost_time) iprint("Connected to %s[:%s]: seq=%d time=%.2f ms" % ( self._host, self._port, n, s_runtime)) self._conn_times.append(s_runtime) except socket.timeout: iprint("Connected to %s[:%s]: seq=%d time out!" % ( self._host, self._port, n)) self._failed += 1 except KeyboardInterrupt: self.statistics(n - 1) raise KeyboardInterrupt() else: self._successed += 1 finally: s.close() def tcping(self,count=5,family=4): if family==4: for n in range(1, count + 1): s = self._create_socket(socket.AF_INET, socket.SOCK_STREAM) try: time.sleep(1) cost_time = self.timer.cost( (s.connect, s.shutdown), ((self._host, self._port), None)) s_runtime = 1000 * (cost_time) iprint("Connected to %s[:%s]: seq=%d time=%.2f ms" % ( self._host, self._port, n, s_runtime)) self._conn_times.append(s_runtime) except socket.timeout: iprint("Connected to %s[:%s]: seq=%d time out!" % ( self._host, self._port, n)) self._failed += 1 except KeyboardInterrupt: self.statistics(n - 1) raise KeyboardInterrupt() else: self._successed += 1 finally: s.close() # ipv6适配 else: for n in range(1, count + 1): s = self._create_socket(socket.AF_INET6, socket.SOCK_STREAM) try: time.sleep(1) cost_time = self.timer.cost( (s.connect, s.shutdown), ((self._host, self._port), None)) s_runtime = 1000 * (cost_time) iprint("Connected to %s[:%s]: seq=%d time=%.2f ms" % ( self._host, self._port, n, s_runtime)) self._conn_times.append(s_runtime) except socket.timeout: iprint("Connected to %s[:%s]: seq=%d time out!" % ( self._host, self._port, n)) self._failed += 1 except KeyboardInterrupt: self.statistics(n - 1) raise KeyboardInterrupt() else: self._successed += 1 finally: s.close() def cli(host, port, count, timeout): ping = Ping(host, port, timeout) if "." in host: ping.tcping(count,4) else: ping.tcping(count,6) return ping.statistics(count)