# -*- coding: utf-8 -*- """ @file run_tests.py @brief provides a class to collect test information and a method to loop over each test and execute them. @par History - Brian Helgans, Dec 11 2019, initial version """ import os import multiprocessing from Sample_Execution.exceptions import AlgorithmExecutionError class Test: """ a test consists of variables which will eventually populate the environment for a run, and an optional summary. A run method is expected to be appended to thes object after instantiation.""" def __init__(self): ''' Creates an empty test. ''' self.dictionary = dict() """Dict[str : str]. An empty dictionary (key:value) that will later hold all the settings necessary for the test. """ self.summary = None def update(self, a_dictionary): ''' Updates the test dictionary which will be used to set the environment. :param a_dictionary: used to set the environment later :type a_dictionary: dict ''' self.dictionary.update(a_dictionary) def set_env(self): ''' sets the environment right before a run. ''' os.environ.update(self.dictionary) # a run method will be needed for run_tests. #run() #run.schedule() def run_tests(tests): ''' Iterates over a list of tests. All tests must have a run method. :param tests: list of tests :type tests: list[Test] ''' #save the environment. We want to reset to this after each test. old_environ = os.environ.copy() #loop over all day tests. for test in tests: #consider setting this inside a context manager in case we decide to catch and recover from exceptions. test.set_env() #this just separates things for human readability. It's not really an event so it's not logged with the logger. line = "{:{fill}^120}".format("",fill="-") print(line) print("{0:{fill}^120}".format(" Starting a new test: "+str(test.run.testname)+" ",fill="-")) print(line) #sometimes the test comes with a summary if test.summary is not None: print("Test Summary: " + test.summary) test.run.schedule() os.environ.clear() os.environ.update(old_environ) def run_test_subprocess(test): test.set_env() test.run.schedule() def run_tests_in_parallel(tests, max_processes): with multiprocessing.Pool(int(max_processes)) as pool: res = pool.map_async(run_test_subprocess,tests) pool.close() pool.join() if not res.successful(): raise AlgorithmExecutionError(message="Failure in a job")