blob: edb85c1ad2469744b07141001a7773172c283bcc (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
|
import ptf
import time
import os
import subprocess
import shutil
from urllib import request
mrzcpd_run_dir = "/var/run/mrzcpd"
class Mrzcpd:
def __init__(self, conf_start, conf_dynamic):
self.mrzcpd_path = ptf.testutils.test_param_get(
"source_dir") + "/service/mrzcpd"
self.conf_start = str(conf_start)
self.conf_path_start = ptf.testutils.test_param_get(
"source_dir") + "/test/ptf_test/mrglobal.conf"
self.conf_dynamic = str(conf_dynamic)
self.conf_path_dynamic = ptf.testutils.test_param_get(
"source_dir") + "/test/ptf_test/mrglobal.dynamic.conf"
self.file_start = open(self.conf_path_start, 'w')
self.file_start.write(self.conf_start)
self.file_start.close()
self.file_dynamic = open(self.conf_path_dynamic, 'w')
self.file_dynamic.write(self.conf_dynamic)
self.file_dynamic.close()
def start(self):
# Create run dir
if os.path.exists(mrzcpd_run_dir):
shutil.rmtree(mrzcpd_run_dir)
os.makedirs(mrzcpd_run_dir)
# Start mrzcpd
self.mrzcpd_process = subprocess.Popen(
[self.mrzcpd_path, "-c", self.conf_path_start, "-s", self.conf_path_dynamic])
# check the mrzcpd http server
start_timeout = 60
while start_timeout > 0:
try:
print("Try to connect to mrzcpd http server")
response = request.urlopen('http://127.0.0.1:9086/probe')
print(response.read())
break
except:
time.sleep(1)
start_timeout = start_timeout - 1
def stop(self):
self.mrzcpd_process.terminate()
self.mrzcpd_process.wait()
|