summaryrefslogtreecommitdiff
path: root/script/sched.py
blob: 60eaf5bdf557b0c726c9b17a663f984cf1c43e04 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
import datetime
import json
import threading
import time

import pymysql
from neomodel import config, db
from neomodel.integration.pandas import to_dataframe
from pymysql.converters import escape_string
from schedule import repeat, every, run_pending, run_all, idle_seconds

from apps.model import NodeResolver53
from settings import *
from util import log
from .neo4jcommand import *


class DataSaver():
    def __init__(self):
        # mysql连接
        dbname = MYSQL_DATADB
        self.tabname = MYSQL_DATADB_COMDATATAB
        self.v6dnstabname = MYSQL_DATADB_DATETAB
        self.conn = pymysql.connect(host=MYSQL_HOST, user='root', password=MYSQL_PAWD, port=MYSQL_PORT)
        self.cursor = self.conn.cursor()

        # 初始化sql语句
        dbsql = "CREATE DATABASE IF NOT EXISTS %s" % dbname

        tablesql = """
                CREATE TABLE IF NOT EXISTS %s (
                id INT auto_increment PRIMARY KEY ,
                name VARCHAR(50) NOT NULL UNIQUE,
                data JSON NOT NULL 
                )ENGINE=innodb DEFAULT CHARSET=utf8; """ % self.tabname
        v6dnstablesql = """
                        CREATE TABLE IF NOT EXISTS %s (
                        id INT auto_increment PRIMARY KEY ,
                        date DATE NOT NULL UNIQUE,
                        data INT NOT NULL 
                        )ENGINE=innodb DEFAULT CHARSET=utf8; """ % self.v6dnstabname

        # 执行sql语句
        try:
            self.cursor.execute(dbsql)
            self.conn.commit()
            self.conn.select_db(dbname)
            self.cursor.execute(tablesql)
            self.cursor.execute(v6dnstablesql)
            self.conn.commit()
            log.info("Data Saver created")
        except Exception as e:
            log.error(e)


da = DataSaver()


@repeat(every().day)
def refresh_neo4j():
    log.info("开始从neo4j刷新数据")
    config.DATABASE_URL = NEO4J_URL
    db.set_connection(NEO4J_URL)
    # 去重
    db.cypher_query(distinct)
    log.info("完成去重")

    # 建立计算图
    db.cypher_query(gds_delgraph)
    db.cypher_query(gds_newgraph)
    log.info("完成计算图刷新")

    # 双栈计数
    dual_countresult = db.cypher_query(dualcountcypher, retry_on_session_expire=True)[0][0]
    # 将结果包装为json字符串
    param = {'count': dual_countresult[0]}
    d_param = json.dumps(param)

    sql = "REPLACE INTO %s(id,name,data) VALUES (1,'%s','%s')" % (
        da.tabname, "dualcount", escape_string(d_param))
    try:
        da.cursor.execute(sql)
        da.conn.commit()
        log.info("完成双栈统计数据刷新")
    except Exception as e:
        log.error(e)

    # 双栈信息
    dual_dataresult = to_dataframe(db.cypher_query(dualdatacypher, retry_on_session_expire=True))
    res = dual_dataresult.to_json(orient="index")
    sql = "REPLACE INTO %s(id,name,data) VALUES (2,'%s','%s')" % (da.tabname, "dualdata", escape_string(res))
    try:
        da.cursor.execute(sql)
        da.conn.commit()
        log.info("完成双栈信息数据刷新")
    except Exception as e:
        log.error(e)

    # v6dns计数
    v6nodecountresult = len(NodeResolver53.nodes.filter(IPType="v6"))
    sql = "INSERT INTO %s(date,data) VALUES (str_to_date('%s','%%Y-%%m-%%d'),%s) ON DUPLICATE KEY UPDATE data=%s" % (
        da.v6dnstabname, str(datetime.date.today()), v6nodecountresult, v6nodecountresult)
    try:
        da.cursor.execute(sql)
        da.conn.commit()
        log.info("完成v6dns统计数据刷新")

    except Exception as e:
        log.error(e)

    # 节点关联地理数据
    all_geodataresult = to_dataframe(db.cypher_query(IP_relate, retry_on_session_expire=True))
    res = all_geodataresult.to_json(orient="index")
    sql = "REPLACE INTO %s(id,name,data) VALUES (3,'%s','%s')" % (da.tabname, "iprelate", escape_string(res))
    try:
        da.cursor.execute(sql)
        da.conn.commit()
        log.info("完成IP关联信息数据刷新")
    except Exception as e:
        log.error(e)

    # AS关联数据
    ASN_relateresult = to_dataframe(db.cypher_query(AS_relate, retry_on_session_expire=True))
    res = ASN_relateresult.to_json(orient="index")
    sql = "REPLACE INTO %s(id,name,data) VALUES (4,'%s','%s')" % (da.tabname, "asnrelate", escape_string(res))
    try:
        da.cursor.execute(sql)
        da.conn.commit()
        log.info("完成ASN关联信息数据刷新")
    except Exception as e:
        log.error(e)

    # ISP关联数据
    ISP_relateresult = to_dataframe(db.cypher_query(ISP_relate, retry_on_session_expire=True))
    res = ISP_relateresult.to_json(orient="index")
    sql = "REPLACE INTO %s(id,name,data) VALUES (5,'%s','%s')" % (da.tabname, "isprelate", escape_string(res))
    try:
        da.cursor.execute(sql)
        da.conn.commit()
        log.info("完成ISP关联信息数据刷新")
    except Exception as e:
        log.error(e)

    # AS分布数据
    AS_distresult = to_dataframe(db.cypher_query(AS_dist, retry_on_session_expire=True))
    res = AS_distresult.to_json(orient="index")
    sql = "REPLACE INTO %s(id,name,data) VALUES (6,'%s','%s')" % (da.tabname, "asndist", escape_string(res))
    try:
        da.cursor.execute(sql)
        da.conn.commit()
        log.info("完成AS分布数据刷新")
    except Exception as e:
        log.error(e)

    # ISP分布数据
    ISP_distresult = to_dataframe(db.cypher_query(ISP_dist, retry_on_session_expire=True))
    res = ISP_distresult.to_json(orient="index")
    sql = "REPLACE INTO %s(id,name,data) VALUES (7,'%s','%s')" % (da.tabname, "ispdist", escape_string(res))
    try:
        da.cursor.execute(sql)
        da.conn.commit()
        log.info("完成ISP分布数据刷新")
    except Exception as e:
        log.error(e)

    # 国家分布数据
    COU_distresult = to_dataframe(db.cypher_query(cou_dist, retry_on_session_expire=True))
    res = COU_distresult.to_json(orient="index")
    sql = "REPLACE INTO %s(id,name,data) VALUES (8,'%s','%s')" % (da.tabname, "coudist", escape_string(res))
    try:
        da.cursor.execute(sql)
        da.conn.commit()
        log.info("完成国家分布数据刷新")
    except Exception as e:
        log.error(e)

    # 获取高危节点及其邻居节点信息
    DANGER_noderesult = to_dataframe(db.cypher_query(dangerous_nodes, retry_on_session_expire=True))

    log.info("完成数据一轮刷新,下一次刷新开始于: " + str(int(idle_seconds())) + "秒后")


def run_continuously(interval=300):
    """Continuously run, while executing pending jobs at each
    elapsed time interval.
    @return cease_continuous_run: threading. Event which can
    be set to cease continuous run. Please note that it is
    *intended behavior that run_continuously() does not run
    missed jobs*. For example, if you've registered a job that
    should run every minute and you set a continuous run
    interval of one hour then your job won't be run 60 times
    at each interval but only once.
    """
    cease_continuous_run = threading.Event()

    class ScheduleThread(threading.Thread):
        @classmethod
        def run(cls):
            run_all()
            while not cease_continuous_run.is_set():
                try:
                    run_pending()
                    time.sleep(interval)
                except Exception as e:
                    time.sleep(interval)
                    print(e)

    continuous_thread = ScheduleThread()
    continuous_thread.start()
    return cease_continuous_run


def run():
    run_continuously(600)