# -*- coding: utf-8 -*- """ @file parallel.py @brief Example python 3 script to schedule a parallel processed job @par History - Arthur Russakoff, Jul 26 2019, initial version - Brian Helgans, Dec 02 2019, updated to add log name as input - Brian Helgans, Dec 17 2019, rewritten as a class """ # Sample_Execution is located here import os from pathlib import Path import datetime import multiprocessing import subprocess import json from . import log as Log from .timer import Timer, timethis from .exceptions import AlgorithmExecutionError """ Once initialized and scheduled the algorithm will run. operational users should feel free to replace this module in it's entirety. """ class Parallel: ''' Example of parallel execution. Catches any non-zero returns from the execution and exits execution in this event. ''' def __init__(self, executable, config, total_segments, max_parallel_process, job_id=None, log_path=None, testname=None): """ All process run parameters must be set. :param executable: executable name (e.g. algorithms.exe) :type executable: str :param config: main configuration file (e.g. Config.xml) :type config: str :param total_segments: total segments to process as string or integer :type total_segments: Union[int,str] :param max_parallel_process: maximum jobs to be run in parallel as string or integer :type max_parallel_process: Union[int,str] :param job_id: a unique string identifier for the overall job. Defaults to a pattern: framework_job_%d%d%d%d%d%d_%d_%s :type job_id: str :param log_path: a path where the log will be stored. Default: "./" :type log_path: str :param testname: Name of test. Defaults to job_id :type testname: str """ self.exe = executable self.cfg = config self.max_jobs = max_parallel_process self.total_seg = int(total_segments) if job_id is None: self.job_id = self._generate_job_id() else: self.job_id = job_id if log_path is None: self.log_path = Path("./") if testname is None: self.testname = self.job_id else: self.log_path = Path(log_path) if testname is None: self.testname = self.log_path @timethis def schedule(self): """ schedules a parallel process consisting of preprocessing, parallel segment processing, and post processing. NOTE: operational users should feel free to replace this module in it's entirety. """ __func = "Parallel.schedule" # the setup Log.info(__func, "Starting job: " + self.job_id) Log.info(__func, "Creating Logs in:" + str(self.log_path)) self.log_path.mkdir(parents=True, exist_ok=True) # backing up the environment in a log with open(self.log_path / "environment.txt", "w") as outfile: line = "{:{fill}^120}".format("", fill="-") title = "{:{fill}^120}".format("environment:", fill="-") print(line, file=outfile) print(title , file=outfile) print(line, file=outfile) print("environment=" + str(json.dumps(dict(os.environ))), file=outfile) print(line, file=outfile) print("configuration: ", self.cfg, file=outfile) print("max parallel: ", self.max_jobs, file=outfile) print(line, file=outfile) # these execute self._preproc() self._segmentScheduler() self._postproc() @timethis def _preproc(self): """ executes preprocessing as a subprocess. NOTE: operational users should feel free to replace this module in it's entirety. """ try: with open(self.log_path / "pre_log.txt", "w") as outfile: print("configuration: ", self.cfg, file=outfile, flush=True) subprocess.run([self.exe, self.cfg, "-j", self.job_id, "-m", "pre"], stdin=None, check=True, stdout=outfile, stderr=outfile, shell=False) except Exception as e: raise AlgorithmExecutionError(e, "Preprocessing failed") def _segment(self, seg): """ schedules a single parallel segment NOTE: operational users should feel free to replace this module in it's entirety. """ __func = "Parallel._segment" segname = str(seg) timer = Timer(__func + " " + segname, log=False) try: with open(self.log_path / ("seg_" + segname + "_log.txt"), "w") as outfile: print("configuration: ", self.cfg, file=outfile, flush=True) # copy this below to run valgrind: "valgrind", "--tool=memcheck", "--leak-check=full", # "--show-reachable=yes", "--error-limit=no", "--log-file=valgrid.out", # copy this below to run cachegrind: "valgrind", "--tool=callgrind", subprocess.run([self.exe, self.cfg, "-j", self.job_id, "-s", segname], stdin=None, check=True, stdout=outfile, stderr=outfile, shell=False) except Exception as e: raise AlgorithmExecutionError(e, "segment " + segname + " processing failed") Log.info(__func, "segment " + segname + " successfully ran for: " + timer.stop(log=False)) @timethis def _segmentScheduler(self): """ schedules the segments as parallel processes. NOTE: operational users should feel free to replace this module in it's entirety. """ Log.info("Parallel._segmentScheduler", "Processing " + str(self.total_seg) + " segments in parallel, " + str(self.max_jobs) + " at a time") # create a pool with a maximum of 4 processes pool = multiprocessing.Pool(self.max_jobs) # range from 1 to self.total_seg segments = range(1, self.total_seg + 1) # create partial to interface with pool.map # func=partial(spawn_framework_segment, exe, self.cfg, JOB_ID,log_path=log_path) # iterate over segment number to spawn framework segment processes pool.map(self._segment, segments) # close pool pool.close() # join pool pool.join() @timethis def _postproc(self): """ executes post processing. NOTE: operational users should feel free to replace this module in it's entirety. """ try: with open(self.log_path / "post_log.txt", "w") as outfile: print("configuration: ", self.cfg, file=outfile, flush=True) subprocess.run([self.exe, self.cfg, "-j", self.job_id, "-m", "post"], stdin=None, check=True, stdout=outfile, stderr=outfile, shell=False) except Exception as e: raise AlgorithmExecutionError(e, "Postprocessing failed") def _generate_job_id(self): """ generate a unique JOB Id for this job """ now = datetime.datetime.now() pid = str(os.getpid()) JOB_ID = "framework_job_%d%d%d%d%d%d_%d_%s" % ( now.year, now.month, now.day, now.hour, now.minute, now.second, now.microsecond, pid) return JOB_ID