summaryrefslogtreecommitdiff
path: root/detection/tool/KnowledgeBaseTool.py
blob: a8079d41c0069f1f3c8028726d8265106dd61d11 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time    : 2023/6/12 16:09
# @author    : yinjinagyi
# @File    : KnowledgeBaseTool.py
# @Function:

import json
import sys
import time
from warnings import simplefilter
import requests
import sys
sys.path.append('..')

from tool.LoggingTool import Logger
from urllib.parse import quote

logger = Logger().getLogger()
simplefilter(action='ignore', category=FutureWarning)


class KnowledgeApi:
    def __init__(self, config):
        self.api_address = config['host']
        self.api_username = config['kb_username']
        self.api_pin = config['api_pin']
        self.api_path = config['api_path']
        self.retry_max = config['api_retry_times']
        self.request_timeout = config['api_timeout']

        self.api_token = config['api_token']

    def get_api_token(self):
        url = "http://" + self.api_address + "/sys/login"
        data = {
            "username": self.api_username,
            "pin": self.api_pin
        }
        try:
            resp = requests.post(url, data=json.dumps(data),
                                 headers={'Content-Type': 'application/json'}, timeout=self.request_timeout)
            return json.loads(resp.text).get('data').get('token')
        except Exception as e:
            logger.error(e)
            sys.exit()

    def get_library_id(self, library_name):
        global resp
        url = "http://" + self.api_address + "/v1/knowledgeBase/list?name=" + library_name
        header = {
            "Cn-Authorization": self.api_token
        }
        try:
            resp = requests.get(url, headers=header, timeout=self.request_timeout)
            return json.loads(resp.text).get('data').get('list')[0].get('knowledgeId')
        except Exception as e:
            logger.error(resp.text)
            logger.error(e)
            sys.exit()


    def file_import(self, file_path, action, description=''):
        url = 'http://' + self.api_address + self.api_path
        file = open(file_path, "rb")
        file_object = {"file": file}

        param = {
            "action": action,
            "description": description
        }

        header = {
            "Cn-Authorization": self.api_token
        }

        is_succeed = 0
        response_code = -1
        for i in range(self.retry_max):
            try:
                resp = requests.post(url, files=file_object, params=param,
                                     headers=header, timeout=self.request_timeout)
                resp = json.loads(resp.text)
                logger.info(resp)
                response_code = resp.get('code')
                if response_code == 200:
                    is_succeed = 1
                    break
            except requests.RequestException as e:
                time.sleep(5)
                logger.error(e)
                continue

        if not is_succeed:
            logger.error('Import Failed!')
            sys.exit()
        else:
            logger.info('Import succeed. Response code {}.'.format(response_code))

        file.close()


    def get_knowledgebase_count(self, knowledge_id, page_size=None, page_no=None, q=None):
        url = 'http://' + self.api_address + '/v1/knowledgeBase/' + str(knowledge_id)
        q = quote(q, 'utf-8')
        param = {
            "pageNo": page_no,
            "pageSize": page_size

        }
        url += ('?q=' + q)

        header = {
            "Cn-Authorization": self.api_token
        }
        try:
            resp = requests.get(url, params=param, headers=header, timeout=self.request_timeout)
            return json.loads(resp.text).get('data').get('total')
        except Exception as e:
            logger.error(e)
            sys.exit()