# 获取系统运行时间、v6&双栈数据总数、每日增长 import datetime import json import pymysql from apiflask import APIBlueprint from settings import * bp = APIBlueprint('sys', __name__, url_prefix='/sys') class DataAccess(): def __init__(self): # mysql连接 dbname = MYSQL_DATADB self.tabname = MYSQL_DATADB_CONFIGTAB self.conn = pymysql.connect(host=MYSQL_HOST, user='root', password=MYSQL_PAWD, port=MYSQL_PORT) self.cursor = self.conn.cursor() # 初始化sql语句 dbsql = "CREATE DATABASE IF NOT EXISTS %s" % dbname tablesql = """ CREATE TABLE IF NOT EXISTS %s ( id INT auto_increment PRIMARY KEY , name VARCHAR(50) NOT NULL, info VARCHAR(100) )ENGINE=innodb DEFAULT CHARSET=utf8; """ % self.tabname # 执行sql语句 self.cursor.execute(dbsql) self.conn.commit() self.conn.select_db(dbname) self.cursor.execute(tablesql) self.conn.commit() # 获取一般数据 def getcommondata(self, colname): sql = "SELECT data FROM %s WHERE name='%s' " % (MYSQL_DATADB_COMDATATAB, colname) self.cursor.execute(sql) return self.cursor.fetchall() # 获取时序数据 def get_date_data(self, tabname): sql = "SELECT DATE(date),data FROM %s ORDER BY DATE(date)" % (tabname) self.cursor.execute(sql) return self.cursor.fetchall() da = DataAccess() # 运行时间获取接口 @bp.get("/startday") def sysdays(): sql = "SELECT info FROM %s WHERE name='%s'" % (da.tabname, "startday") da.cursor.execute(sql) result = da.cursor.fetchall() # 不存在结果,插入当天日期 if len(result) == 0: sql = "INSERT INTO %s(id,name,info) VALUES (1,'%s',str_to_date('%s','%%Y-%%m-%%d'))" % ( da.tabname, "startday", str(datetime.date.today())) da.cursor.execute(sql) da.conn.commit() sday = datetime.date.today() else: sday = datetime.datetime.strptime(result[0][0], '%Y-%m-%d').date() # 计算相距时间并返回 difftime = (datetime.date.today() - sday).days return {"code": 0, "data": {"count": difftime if difftime > 0 else 1}, "msg": "success"} # v6 dns计数接口 @bp.get("/v6dns") def sysv6count(): countlist = da.get_date_data(MYSQL_DATADB_DATETAB) return {"code": 0, "data": {"count": countlist[-1][1]}, "msg": "success"} @bp.get("/dual") def sysdualcount(): # 获取json字符串 count = da.getcommondata("dualcount")[0][0] # 解析为字典 c = json.loads(count) return {"code": 0, "data": {"count": c['count']}, "msg": "success"} @bp.get("/dcount") def sysdatecount(): countlist = da.get_date_data(MYSQL_DATADB_DATETAB) datecount = [] for c in countlist: datecount.append({"date": c[0].strftime('%Y年%m月%d日'), "count": c[1]}) # 返回datecount列表 return {"code": 0, "data": {"list": datecount}, "msg": "success"}