from __future__ import annotations from typing import List, Callable, Union import re import argparse import sys import io from pathlib import Path import logging import yaml from functools import partial _log = logging.getLogger(__name__) # # level related items # l0 = re.compile(r'^#\s') # 1 level l1 = re.compile(r'^##\s') # 2 level l2 = re.compile(r'^###\s') # 3 level levels = [l0, l1, l2] def find_level(line, prior): """returns a level or none""" for i, level in enumerate(levels): if level.match(line): return i + 1 # if nothing comes first then the line is first if prior == 0: return 0 # must not be a header return None # # Filter bullets (* or -) # Filter empty lines # Filter by algorithm (Placeholder for now) bulletre = re.compile(r'^(-|\*)\s') newlinere = re.compile(r'^\n') emptyre = re.compile(r'^$') tagsre = re.compile(r'^(-|\*)\s*\*\*\[(?P.*)\]\s*\\<(?P.*)\\>', re.MULTILINE) class Node: """ Text nested by heading hierarchy. levels: # vs ## vs ## bullet levels: -* """ def __init__(self, line, level, parent, heading): self.children: List[Node] = [] self.parent: Node = parent self.level = level self.heading = heading self.text = line def add_child(self, node): """add a child to this node.""" self.children.append(node) def as_text(self): """generates all of the lines within the node where each item is a new line.""" yield self.text for child in self.children: yield from child.as_text() def as_dict(self): """represents graph as a dictionary (for testing purposes)""" if len(self.children) > 1: return {self.text: [node.as_dict() for node in self.children]} elif len(self.children) == 1: return {self.text: self.children[0].as_dict()} else: return self.text def clean_house(self): """delete unused heading nodes.""" children = self.children self.children = [] for child in children: child.clean_house() # if a heading has children then keep it # also keep non-headings if child.heading and child.children: self.children.append(child) elif not child.heading: self.children.append(child) def match_tag(tags: str, filters: List[re.Pattern]) -> bool: """Returns true if an item in the space separated list (e.g. CLOUD AEROSOL) matches a regex pattern.""" for key in tags.split(): if any([f.match(key) for f in filters]): return True return False def is_kept(line, alg_filters, inst_filters) -> bool: """Returns true if the line is kept (by matching filters).""" if newlinere.match(line) or emptyre.match(line): return False if bulletre.match(line): tag_match = tagsre.match(line) if tag_match: tags = tag_match.groupdict() if tag_match and 'alg' in tags: if match_tag(tags['alg'], alg_filters) and match_tag(tags['inst'], inst_filters): return True else: return False else: _log.warning(f'did not see tags for this log: \'{line}\'') return True def create_is_kept_func(spec) -> Union[str, Callable]: """returns the is_kept function for the given specification.""" title = spec['project'] alg_filters = [] inst_filters = [re.compile(r'all')] # set up the alg_filters for elem in spec['algorithms']: alg_filters.append(re.compile(elem)) # set up the inst_filters (part way) for elem in spec['instruments']: inst_filters.append(re.compile(elem)) # treat satellite as inst_filter. for elem in spec['satellites']: inst_filters.append(re.compile(elem)) return title, partial(is_kept, alg_filters=alg_filters, inst_filters=inst_filters) def process_line(line, current, level, is_kept_func): """process line if we can.""" # find level level = find_level(line, level) # we will process the line and/or change current graph level processed = False if not level: # # this is not a new level so all we need to do # is create a new child node and then pop off this line. # processed = True level = current.level + 1 heading = False if is_kept_func(line) or level < 1: child = Node(line, level, current, heading) # create the current current.add_child(child) else: # # Change the current level. # Do a bit of recursion to process the remaining lines. # heading = True if level <= current.level: # # move back up a level # current = current.parent elif level > current.level: # # the level as a child # new_current = Node(line, level, current, heading) # create the current current.add_child(new_current) current = new_current processed = True return current, processed def create_tree(lines: io.TextIOWrapper, root: Node, is_kept_func): """ creates a tree of all the headings an bullets""" # start processing at the root node. current: Node = root level = 0 while True: # # loop over every line until no more are remaining # line = lines.readline() i = 0 while line: i += 1 if i > 100: raise Exception(f'Unable to process line: {line}') current, processed = process_line(line, current, level, is_kept_func) if processed: break else: break return root, current def main(input_file: Path, output_file: Path, spec_file: Path): """CLI (__main__) calls this. takes incoming change log and generates a filtered output.""" with open(spec_file, "r") as s: spec = yaml.safe_load(s) title, is_kept_func = create_is_kept_func(spec) # create a title (root) node root = Node(f'# {title}\n', 0, None, heading=True) # each line in the file will become an organized graph # where "##" is a child of "#" # where "###" is a child of "##" with open(input_file, 'r') as fin: root, _ = create_tree(fin, root, is_kept_func) # state of current changes with every line loop. # delete unused headings # the headings are missing their children # because of the filtering that occurred during the read process. root.clean_house() # write to file with open(output_file, "w") as fo: for x in root.as_text(): fo.write(x) # # CLI # if __name__ == "__main__": parser = argparse.ArgumentParser(description = 'This tool will subset the changelog according to the provided specification document') parser.add_argument('input_file', type=str, help='input changelog file') parser.add_argument('output_file', type=str, help='output changelog file') parser.add_argument('spec', type=str, help='Specification for subset') parser.add_argument('--start_rev', type=str, help='starting revision for changelog.') parser.add_argument('--end_rev', type=str, help='ending revision for changelog.') parser.parse_args(args=None if sys.argv[1:] else ['--help']) args = parser.parse_args() if args.start_rev or args.end_rev: raise NotImplementedError("--start_rev and --end_rev is not yet implemented.") main(Path(args.input_file), Path(args.output_file), Path(args.spec))