import datetime import json import threading import time import pymysql from neomodel import config, db from neomodel.integration.pandas import to_dataframe from pymysql.converters import escape_string from schedule import repeat, every, run_pending, run_all, idle_seconds from apps.model import NodeResolver53 from settings import * from util import log from .neo4jcommand import * class DataSaver(): def __init__(self): # mysql连接 dbname = MYSQL_DATADB self.tabname = MYSQL_DATADB_COMDATATAB self.v6dnstabname = MYSQL_DATADB_DATETAB self.conn = pymysql.connect(host=MYSQL_HOST, user='root', password=MYSQL_PAWD, port=MYSQL_PORT) self.cursor = self.conn.cursor() # 初始化sql语句 dbsql = "CREATE DATABASE IF NOT EXISTS %s" % dbname tablesql = """ CREATE TABLE IF NOT EXISTS %s ( id INT auto_increment PRIMARY KEY , name VARCHAR(50) NOT NULL UNIQUE, data JSON NOT NULL )ENGINE=innodb DEFAULT CHARSET=utf8; """ % self.tabname v6dnstablesql = """ CREATE TABLE IF NOT EXISTS %s ( id INT auto_increment PRIMARY KEY , date DATE NOT NULL UNIQUE, data INT NOT NULL )ENGINE=innodb DEFAULT CHARSET=utf8; """ % self.v6dnstabname # 执行sql语句 try: self.cursor.execute(dbsql) self.conn.commit() self.conn.select_db(dbname) self.cursor.execute(tablesql) self.cursor.execute(v6dnstablesql) self.conn.commit() log.info("Data Saver created") except Exception as e: log.error(e) da = DataSaver() @repeat(every().day) def refresh_neo4j(): log.info("开始从neo4j刷新数据") config.DATABASE_URL = NEO4J_URL db.set_connection(NEO4J_URL) # 去重 db.cypher_query(distinct) log.info("完成去重") # 建立计算图 db.cypher_query(gds_delgraph) db.cypher_query(gds_newgraph) log.info("完成计算图刷新") # 双栈计数 dual_countresult = db.cypher_query(dualcountcypher, retry_on_session_expire=True)[0][0] # 将结果包装为json字符串 param = {'count': dual_countresult[0]} d_param = json.dumps(param) sql = "REPLACE INTO %s(id,name,data) VALUES (1,'%s','%s')" % ( da.tabname, "dualcount", escape_string(d_param)) try: da.cursor.execute(sql) da.conn.commit() log.info("完成双栈统计数据刷新") except Exception as e: log.error(e) # 双栈信息 dual_dataresult = to_dataframe(db.cypher_query(dualdatacypher, retry_on_session_expire=True)) res = dual_dataresult.to_json(orient="index") sql = "REPLACE INTO %s(id,name,data) VALUES (2,'%s','%s')" % (da.tabname, "dualdata", escape_string(res)) try: da.cursor.execute(sql) da.conn.commit() log.info("完成双栈信息数据刷新") except Exception as e: log.error(e) # v6dns计数 v6nodecountresult = len(NodeResolver53.nodes.filter(IPType="v6")) sql = "INSERT INTO %s(date,data) VALUES (str_to_date('%s','%%Y-%%m-%%d'),%s) ON DUPLICATE KEY UPDATE data=%s" % ( da.v6dnstabname, str(datetime.date.today()), v6nodecountresult, v6nodecountresult) try: da.cursor.execute(sql) da.conn.commit() log.info("完成v6dns统计数据刷新") except Exception as e: log.error(e) # 节点关联地理数据 all_geodataresult = to_dataframe(db.cypher_query(IP_relate, retry_on_session_expire=True)) res = all_geodataresult.to_json(orient="index") sql = "REPLACE INTO %s(id,name,data) VALUES (3,'%s','%s')" % (da.tabname, "iprelate", escape_string(res)) try: da.cursor.execute(sql) da.conn.commit() log.info("完成IP关联信息数据刷新") except Exception as e: log.error(e) # AS关联数据 ASN_relateresult = to_dataframe(db.cypher_query(AS_relate, retry_on_session_expire=True)) res = ASN_relateresult.to_json(orient="index") sql = "REPLACE INTO %s(id,name,data) VALUES (4,'%s','%s')" % (da.tabname, "asnrelate", escape_string(res)) try: da.cursor.execute(sql) da.conn.commit() log.info("完成ASN关联信息数据刷新") except Exception as e: log.error(e) # ISP关联数据 ISP_relateresult = to_dataframe(db.cypher_query(ISP_relate, retry_on_session_expire=True)) res = ISP_relateresult.to_json(orient="index") sql = "REPLACE INTO %s(id,name,data) VALUES (5,'%s','%s')" % (da.tabname, "isprelate", escape_string(res)) try: da.cursor.execute(sql) da.conn.commit() log.info("完成ISP关联信息数据刷新") except Exception as e: log.error(e) # AS分布数据 AS_distresult = to_dataframe(db.cypher_query(AS_dist, retry_on_session_expire=True)) res = AS_distresult.to_json(orient="index") sql = "REPLACE INTO %s(id,name,data) VALUES (6,'%s','%s')" % (da.tabname, "asndist", escape_string(res)) try: da.cursor.execute(sql) da.conn.commit() log.info("完成AS分布数据刷新") except Exception as e: log.error(e) # ISP分布数据 ISP_distresult = to_dataframe(db.cypher_query(ISP_dist, retry_on_session_expire=True)) res = ISP_distresult.to_json(orient="index") sql = "REPLACE INTO %s(id,name,data) VALUES (7,'%s','%s')" % (da.tabname, "ispdist", escape_string(res)) try: da.cursor.execute(sql) da.conn.commit() log.info("完成ISP分布数据刷新") except Exception as e: log.error(e) # 国家分布数据 COU_distresult = to_dataframe(db.cypher_query(cou_dist, retry_on_session_expire=True)) res = COU_distresult.to_json(orient="index") sql = "REPLACE INTO %s(id,name,data) VALUES (8,'%s','%s')" % (da.tabname, "coudist", escape_string(res)) try: da.cursor.execute(sql) da.conn.commit() log.info("完成国家分布数据刷新") except Exception as e: log.error(e) # 获取高危节点及其邻居节点信息 DANGER_noderesult = to_dataframe(db.cypher_query(dangerous_nodes, retry_on_session_expire=True)) log.info("完成数据一轮刷新,下一次刷新开始于: " + str(int(idle_seconds())) + "秒后") def run_continuously(interval=300): """Continuously run, while executing pending jobs at each elapsed time interval. @return cease_continuous_run: threading. Event which can be set to cease continuous run. Please note that it is *intended behavior that run_continuously() does not run missed jobs*. For example, if you've registered a job that should run every minute and you set a continuous run interval of one hour then your job won't be run 60 times at each interval but only once. """ cease_continuous_run = threading.Event() class ScheduleThread(threading.Thread): @classmethod def run(cls): run_all() while not cease_continuous_run.is_set(): try: run_pending() time.sleep(interval) except Exception as e: time.sleep(interval) print(e) continuous_thread = ScheduleThread() continuous_thread.start() return cease_continuous_run def run(): run_continuously(600)