1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
|
import datetime
import json
import threading
import time
import pymysql
from neomodel import config, db
from neomodel.integration.pandas import to_dataframe
from pymysql.converters import escape_string
from schedule import repeat, every, run_pending, run_all, idle_seconds
from apps.model import NodeResolver53
from settings import *
from util import log
from .neo4jcommand import *
class DataSaver():
def __init__(self):
# mysql连接
dbname = MYSQL_DATADB
self.tabname = MYSQL_DATADB_COMDATATAB
self.v6dnstabname = MYSQL_DATADB_DATETAB
self.conn = pymysql.connect(host=MYSQL_HOST, user='root', password=MYSQL_PAWD, port=MYSQL_PORT)
self.cursor = self.conn.cursor()
# 初始化sql语句
dbsql = "CREATE DATABASE IF NOT EXISTS %s" % dbname
tablesql = """
CREATE TABLE IF NOT EXISTS %s (
id INT auto_increment PRIMARY KEY ,
name VARCHAR(50) NOT NULL UNIQUE,
data JSON NOT NULL
)ENGINE=innodb DEFAULT CHARSET=utf8; """ % self.tabname
v6dnstablesql = """
CREATE TABLE IF NOT EXISTS %s (
id INT auto_increment PRIMARY KEY ,
date DATE NOT NULL UNIQUE,
data INT NOT NULL
)ENGINE=innodb DEFAULT CHARSET=utf8; """ % self.v6dnstabname
# 执行sql语句
try:
self.cursor.execute(dbsql)
self.conn.commit()
self.conn.select_db(dbname)
self.cursor.execute(tablesql)
self.cursor.execute(v6dnstablesql)
self.conn.commit()
log.info("Data Saver created")
except Exception as e:
log.error(e)
da = DataSaver()
@repeat(every().day)
def refresh_neo4j():
log.info("开始从neo4j刷新数据")
config.DATABASE_URL = NEO4J_URL
db.set_connection(NEO4J_URL)
# 去重
db.cypher_query(distinct)
log.info("完成去重")
# 建立计算图
db.cypher_query(gds_delgraph)
db.cypher_query(gds_newgraph)
log.info("完成计算图刷新")
# 双栈计数
dual_countresult = db.cypher_query(dualcountcypher, retry_on_session_expire=True)[0][0]
# 将结果包装为json字符串
param = {'count': dual_countresult[0]}
d_param = json.dumps(param)
sql = "REPLACE INTO %s(id,name,data) VALUES (1,'%s','%s')" % (
da.tabname, "dualcount", escape_string(d_param))
try:
da.cursor.execute(sql)
da.conn.commit()
log.info("完成双栈统计数据刷新")
except Exception as e:
log.error(e)
# 双栈信息
dual_dataresult = to_dataframe(db.cypher_query(dualdatacypher, retry_on_session_expire=True))
res = dual_dataresult.to_json(orient="index")
sql = "REPLACE INTO %s(id,name,data) VALUES (2,'%s','%s')" % (da.tabname, "dualdata", escape_string(res))
try:
da.cursor.execute(sql)
da.conn.commit()
log.info("完成双栈信息数据刷新")
except Exception as e:
log.error(e)
# v6dns计数
result = len(NodeResolver53.nodes.filter(IPType="v6"))
sql = "INSERT INTO %s(date,data) VALUES (str_to_date('%s','%%Y-%%m-%%d'),%s) ON DUPLICATE KEY UPDATE data=%s" % (
da.v6dnstabname, str(datetime.date.today()), result, result)
try:
da.cursor.execute(sql)
da.conn.commit()
log.info("完成v6dns统计数据刷新")
except Exception as e:
log.error(e)
log.info("完成数据一轮刷新,下一次刷新开始于: " + str(int(idle_seconds())) + "秒后")
def run_continuously(interval=300):
"""Continuously run, while executing pending jobs at each
elapsed time interval.
@return cease_continuous_run: threading. Event which can
be set to cease continuous run. Please note that it is
*intended behavior that run_continuously() does not run
missed jobs*. For example, if you've registered a job that
should run every minute and you set a continuous run
interval of one hour then your job won't be run 60 times
at each interval but only once.
"""
cease_continuous_run = threading.Event()
class ScheduleThread(threading.Thread):
@classmethod
def run(cls):
run_all()
while not cease_continuous_run.is_set():
try:
run_pending()
time.sleep(interval)
except Exception as e:
time.sleep(interval)
print(e)
continuous_thread = ScheduleThread()
continuous_thread.start()
return cease_continuous_run
def run():
run_continuously(600)
|