diff options
| author | handingkang <[email protected]> | 2024-01-15 21:18:58 +0800 |
|---|---|---|
| committer | handingkang <[email protected]> | 2024-01-15 21:18:58 +0800 |
| commit | a6612e239f0eaebaa6124c79c7259609fd607439 (patch) | |
| tree | 041d7e99d075a13418664d358967618848f46eb4 | |
| parent | 576b2ee0d35be920645ab5d93d29ae56a09b96ab (diff) | |
数据检索命令定义
| -rw-r--r-- | script/neo4jcommand.py | 6 | ||||
| -rw-r--r-- | script/schedule.py | 96 |
2 files changed, 101 insertions, 1 deletions
diff --git a/script/neo4jcommand.py b/script/neo4jcommand.py index 022aa5d..e825d26 100644 --- a/script/neo4jcommand.py +++ b/script/neo4jcommand.py @@ -67,6 +67,10 @@ dualdatacypher = ''' YIELD nodeId,componentId where componentId in ccl with gds.util.asNode(nodeId) as n,componentId - return n.IP,n.ISP,n.COU,n.CCODE,n.PROV,n.LAT,n.LNG,componentId + return n.IP,n.ISP,n.COU,n.CCODE,n.PROV,n.LAT,n.LNG,componentId limit 30 order by componentId ''' + +# v6dns计数 +v6count = ''' +match (n:NodeResolver53) where n.IPType contains "6" return count(n)''' diff --git a/script/schedule.py b/script/schedule.py index e69de29..c8583b5 100644 --- a/script/schedule.py +++ b/script/schedule.py @@ -0,0 +1,96 @@ +import threading +import time + +import pymysql +from neomodel import config, db +from neomodel.integration.pandas import to_dataframe +from schedule import repeat, every, run_pending + +from apps.model import NodeResolver53 +from neo4jcommand import * + + +class DataSaver(): + def __init__(self): + # mysql连接 + dbname = "v6dnsminerdata" + self.tabname = "data" + self.conn = pymysql.connect(host='localhost', user='root', password='Hdk19990815') + self.cursor = self.conn.cursor() + + # 初始化sql语句 + dbsql = "CREATE DATABASE IF NOT EXISTS %s" % dbname + + tablesql = """ + CREATE TABLE IF NOT EXISTS %s ( + id INT auto_increment PRIMARY KEY , + name VARCHAR(50) NOT NULL UNIQUE, + data JSON NOT NULL , + )ENGINE=innodb DEFAULT CHARSET=utf8; """ % self.tabname + + # 执行sql语句 + self.cursor.execute(dbsql) + self.conn.select_db(dbname) + self.cursor.execute(tablesql) + + +da = DataSaver() + + +@repeat(every().day) +def refresh_neo4j(): + url = "neo4j://neo4j:[email protected]:7678" + config.DATABASE_URL = url + db.set_connection(url) + # 去重 + db.cypher_query(distinct) + + # v6dns计数 + result = len(NodeResolver53.nodes.filter(IPType="v6")) + sql = "REPLACE INTO %s (name, data) VALUES(%s,%s)" % (da.tabname, "v6dnscount", result) + da.cursor.execute(sql) + + # 建立计算图 + db.cypher_query(gds_delgraph) + db.cypher_query(gds_newgraph) + + # 双栈计数 + dual_countresult = db.cypher_query(dualcountcypher, retry_on_session_expire=True)[0][0] + sql = "REPLACE INTO %s(name,data) VALUES (%s,'{%s:%s}')" % ( + da.tabname, "dualcount", "count", str(dual_countresult)) + da.cursor.execute(sql) + + # 双栈信息 + dual_dataresult = to_dataframe(db.cypher_query(dualdatacypher, retry_on_session_expire=True)) + res = dual_dataresult.to_json(result) + sql = "REPLACE INTO %s(name,data) VALUES (%s,%s)" % (da.tabname, "dualdata", res) + da.cursor.execute(sql) + + +def run_continuously(interval=300): + """Continuously run, while executing pending jobs at each + elapsed time interval. + @return cease_continuous_run: threading. Event which can + be set to cease continuous run. Please note that it is + *intended behavior that run_continuously() does not run + missed jobs*. For example, if you've registered a job that + should run every minute and you set a continuous run + interval of one hour then your job won't be run 60 times + at each interval but only once. + """ + cease_continuous_run = threading.Event() + + class ScheduleThread(threading.Thread): + @classmethod + def run(cls): + while not cease_continuous_run.is_set(): + run_pending() + time.sleep(interval) + + continuous_thread = ScheduleThread() + continuous_thread.start() + return cease_continuous_run + + +def run(): + run_continuously(600) |
