import ptf import time import os import subprocess import shutil from urllib import request mrzcpd_run_dir = "/var/run/mrzcpd" class Mrzcpd: def __init__(self, conf_start, conf_dynamic): self.mrzcpd_path = ptf.testutils.test_param_get( "source_dir") + "/service/mrzcpd" self.conf_start = str(conf_start) self.conf_path_start = ptf.testutils.test_param_get( "source_dir") + "/test/ptf_test/mrglobal.conf" self.conf_dynamic = str(conf_dynamic) self.conf_path_dynamic = ptf.testutils.test_param_get( "source_dir") + "/test/ptf_test/mrglobal.dynamic.conf" self.file_start = open(self.conf_path_start, 'w') self.file_start.write(self.conf_start) self.file_start.close() self.file_dynamic = open(self.conf_path_dynamic, 'w') self.file_dynamic.write(self.conf_dynamic) self.file_dynamic.close() def start(self): # Create run dir if os.path.exists(mrzcpd_run_dir): shutil.rmtree(mrzcpd_run_dir) os.makedirs(mrzcpd_run_dir) # Start mrzcpd self.mrzcpd_process = subprocess.Popen( [self.mrzcpd_path, "-c", self.conf_path_start, "-s", self.conf_path_dynamic]) # check the mrzcpd http server start_timeout = 60 while start_timeout > 0: try: print("Try to connect to mrzcpd http server") response = request.urlopen('http://127.0.0.1:9086/probe') print(response.read()) break except: time.sleep(1) start_timeout = start_timeout - 1 def stop(self): self.mrzcpd_process.terminate() self.mrzcpd_process.wait()