summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorhandingkang <[email protected]>2024-01-15 21:18:58 +0800
committerhandingkang <[email protected]>2024-01-15 21:18:58 +0800
commita6612e239f0eaebaa6124c79c7259609fd607439 (patch)
tree041d7e99d075a13418664d358967618848f46eb4
parent576b2ee0d35be920645ab5d93d29ae56a09b96ab (diff)
数据检索命令定义
-rw-r--r--script/neo4jcommand.py6
-rw-r--r--script/schedule.py96
2 files changed, 101 insertions, 1 deletions
diff --git a/script/neo4jcommand.py b/script/neo4jcommand.py
index 022aa5d..e825d26 100644
--- a/script/neo4jcommand.py
+++ b/script/neo4jcommand.py
@@ -67,6 +67,10 @@ dualdatacypher = '''
YIELD nodeId,componentId
where componentId in ccl
with gds.util.asNode(nodeId) as n,componentId
- return n.IP,n.ISP,n.COU,n.CCODE,n.PROV,n.LAT,n.LNG,componentId
+ return n.IP,n.ISP,n.COU,n.CCODE,n.PROV,n.LAT,n.LNG,componentId limit 30
order by componentId
'''
+
+# v6dns计数
+v6count = '''
+match (n:NodeResolver53) where n.IPType contains "6" return count(n)'''
diff --git a/script/schedule.py b/script/schedule.py
index e69de29..c8583b5 100644
--- a/script/schedule.py
+++ b/script/schedule.py
@@ -0,0 +1,96 @@
+import threading
+import time
+
+import pymysql
+from neomodel import config, db
+from neomodel.integration.pandas import to_dataframe
+from schedule import repeat, every, run_pending
+
+from apps.model import NodeResolver53
+from neo4jcommand import *
+
+
+class DataSaver():
+ def __init__(self):
+ # mysql连接
+ dbname = "v6dnsminerdata"
+ self.tabname = "data"
+ self.conn = pymysql.connect(host='localhost', user='root', password='Hdk19990815')
+ self.cursor = self.conn.cursor()
+
+ # 初始化sql语句
+ dbsql = "CREATE DATABASE IF NOT EXISTS %s" % dbname
+
+ tablesql = """
+ CREATE TABLE IF NOT EXISTS %s (
+ id INT auto_increment PRIMARY KEY ,
+ name VARCHAR(50) NOT NULL UNIQUE,
+ data JSON NOT NULL ,
+ )ENGINE=innodb DEFAULT CHARSET=utf8; """ % self.tabname
+
+ # 执行sql语句
+ self.cursor.execute(dbsql)
+ self.conn.select_db(dbname)
+ self.cursor.execute(tablesql)
+
+
+da = DataSaver()
+
+
+@repeat(every().day)
+def refresh_neo4j():
+ url = "neo4j://neo4j:[email protected]:7678"
+ config.DATABASE_URL = url
+ db.set_connection(url)
+ # 去重
+ db.cypher_query(distinct)
+
+ # v6dns计数
+ result = len(NodeResolver53.nodes.filter(IPType="v6"))
+ sql = "REPLACE INTO %s (name, data) VALUES(%s,%s)" % (da.tabname, "v6dnscount", result)
+ da.cursor.execute(sql)
+
+ # 建立计算图
+ db.cypher_query(gds_delgraph)
+ db.cypher_query(gds_newgraph)
+
+ # 双栈计数
+ dual_countresult = db.cypher_query(dualcountcypher, retry_on_session_expire=True)[0][0]
+ sql = "REPLACE INTO %s(name,data) VALUES (%s,'{%s:%s}')" % (
+ da.tabname, "dualcount", "count", str(dual_countresult))
+ da.cursor.execute(sql)
+
+ # 双栈信息
+ dual_dataresult = to_dataframe(db.cypher_query(dualdatacypher, retry_on_session_expire=True))
+ res = dual_dataresult.to_json(result)
+ sql = "REPLACE INTO %s(name,data) VALUES (%s,%s)" % (da.tabname, "dualdata", res)
+ da.cursor.execute(sql)
+
+
+def run_continuously(interval=300):
+ """Continuously run, while executing pending jobs at each
+ elapsed time interval.
+ @return cease_continuous_run: threading. Event which can
+ be set to cease continuous run. Please note that it is
+ *intended behavior that run_continuously() does not run
+ missed jobs*. For example, if you've registered a job that
+ should run every minute and you set a continuous run
+ interval of one hour then your job won't be run 60 times
+ at each interval but only once.
+ """
+ cease_continuous_run = threading.Event()
+
+ class ScheduleThread(threading.Thread):
+ @classmethod
+ def run(cls):
+ while not cease_continuous_run.is_set():
+ run_pending()
+ time.sleep(interval)
+
+ continuous_thread = ScheduleThread()
+ continuous_thread.start()
+ return cease_continuous_run
+
+
+def run():
+ run_continuously(600)