core.evaluation.results
View Source
import os, re, time from copy import deepcopy from functools import reduce from core.model.exec import Executor from core.utils import const, read_json_file, write_json_file, write_file, strip_all class Results(): """ Read the results of single train and evaluation loops and combine them. See the source of `core.results` to find out how to use this class. """ _FILE_PATTERN = re.compile("execution_(?:[A-Za-z]+-?)+_(\d\d\d\d-\d\d-\d\d_\d\d-\d\d-\d\d)\.json") _IGNORE_KEYS = [ 'num_training_metrics', 'num_results' ] def __init__(self): # read all files self.results = {} for f in os.listdir(const.RESULTS_DIR): if Results._FILE_PATTERN.match(f): self._load_results_file(f) # calculate averages self.avg_results = self._average(deepcopy(self.results)) def _get_key_list(self, data): return [ data['classes']['model'] + ( (("-" + data['parameters']['scd_mapping']) if data['parameters']['scd_mapping'] != "none" else "") + (("-" + str(round(data['parameters']['scd_threshold'], 2))) if data['parameters']['scd_threshold'] != "none" else "") ), data['names']['base_model'], strip_all(data['names']['annotated_corpus_train']) + ( "-same-annotations" if not data['parameters']['split_annotator'] else "" ), strip_all(data['names']['annotated_corpus_eval']) + ( "-same-annotations" if not data['parameters']['split_annotator'] else "" ), str(data['percentages'][0]) + "-" + str(data['percentages'][1]), data['parameters']['bert_training'] ] def _get_key_list_names(self): return [ "Model", "BaseModel", "TrainingData", "EvaluationData", "TrainTestRatio", "BERTParameter", "Source", "Type", "Value" ] def _get_value_list(self, data): return data['results'], data['training_metrics'] def _load_results_file(self, filename): data = read_json_file(os.path.join( const.RESULTS_DIR, filename )) parts = Results._FILE_PATTERN.match(filename) current = self.results for key in self._get_key_list(data): if key not in current: current[key] = {} current = current[key] unixtime = int(time.mktime(time.strptime(parts.group(1), Executor.TIME_FORMAT))) current[unixtime] = self._get_value_list(data) def _average(self, r): for k,v in r.items(): if reduce(lambda a,b: a and str(b).isnumeric(), v.keys(), True): # all keys are timestamps => do the averages! averages = { 'results' : {}, 'num_results' : 0, 'training_metrics' : {}, 'num_training_metrics' : 0 } for results, training_metrics in v.values(): self._sum_to_dict(averages, 'results', 'num_results', results) if training_metrics != None: self._sum_to_dict(averages, 'training_metrics', 'num_training_metrics', training_metrics) self._avg_over_dict(averages, 'results', 'num_results') self._avg_over_dict(averages, 'training_metrics', 'num_training_metrics') r[k] = averages else: r[k] = self._average(v) return r def _avg_over_dict(self, dict, column, count_column): for name in dict[column].keys(): dict[column][name] /= dict[count_column] def _sum_to_dict(self, dict, column, count_column, values): dict[count_column] += 1 for name,value in values.items(): if name not in dict[column]: dict[column][name] = 0 dict[column][name] += value def write_json(self, averages=True): """ Write a combined JSON Report """ write_json_file( os.path.join( const.RESULTS_DIR, "report" + ("" if averages else "_raw") + ".json" ), self.avg_results if averages else self.results ) print("==> " + ("Averaged " if averages else "") + "JSON written!") def _get_csv_rows(self, prefix, data): rows = [] for k,v in data.items(): if isinstance(v, dict): rows.extend(self._get_csv_rows(prefix + [k], v)) else: if k not in Results._IGNORE_KEYS: rows.append(prefix + [k, v]) return rows def write_csv(self): """ Write a CSV version of the report (to be read and plotted by R) """ # organize data csv = [] csv.append(self._get_key_list_names()) csv.extend(self._get_csv_rows([], self.avg_results)) # write file csv_file = "" for row in csv: csv_file += ','.join(map(lambda e: str(e), row)) + "\n" write_file( os.path.join( const.RESULTS_DIR, "report.csv" ), csv_file ) print("==> CSV written!")
View Source
class Results(): """ Read the results of single train and evaluation loops and combine them. See the source of `core.results` to find out how to use this class. """ _FILE_PATTERN = re.compile("execution_(?:[A-Za-z]+-?)+_(\d\d\d\d-\d\d-\d\d_\d\d-\d\d-\d\d)\.json") _IGNORE_KEYS = [ 'num_training_metrics', 'num_results' ] def __init__(self): # read all files self.results = {} for f in os.listdir(const.RESULTS_DIR): if Results._FILE_PATTERN.match(f): self._load_results_file(f) # calculate averages self.avg_results = self._average(deepcopy(self.results)) def _get_key_list(self, data): return [ data['classes']['model'] + ( (("-" + data['parameters']['scd_mapping']) if data['parameters']['scd_mapping'] != "none" else "") + (("-" + str(round(data['parameters']['scd_threshold'], 2))) if data['parameters']['scd_threshold'] != "none" else "") ), data['names']['base_model'], strip_all(data['names']['annotated_corpus_train']) + ( "-same-annotations" if not data['parameters']['split_annotator'] else "" ), strip_all(data['names']['annotated_corpus_eval']) + ( "-same-annotations" if not data['parameters']['split_annotator'] else "" ), str(data['percentages'][0]) + "-" + str(data['percentages'][1]), data['parameters']['bert_training'] ] def _get_key_list_names(self): return [ "Model", "BaseModel", "TrainingData", "EvaluationData", "TrainTestRatio", "BERTParameter", "Source", "Type", "Value" ] def _get_value_list(self, data): return data['results'], data['training_metrics'] def _load_results_file(self, filename): data = read_json_file(os.path.join( const.RESULTS_DIR, filename )) parts = Results._FILE_PATTERN.match(filename) current = self.results for key in self._get_key_list(data): if key not in current: current[key] = {} current = current[key] unixtime = int(time.mktime(time.strptime(parts.group(1), Executor.TIME_FORMAT))) current[unixtime] = self._get_value_list(data) def _average(self, r): for k,v in r.items(): if reduce(lambda a,b: a and str(b).isnumeric(), v.keys(), True): # all keys are timestamps => do the averages! averages = { 'results' : {}, 'num_results' : 0, 'training_metrics' : {}, 'num_training_metrics' : 0 } for results, training_metrics in v.values(): self._sum_to_dict(averages, 'results', 'num_results', results) if training_metrics != None: self._sum_to_dict(averages, 'training_metrics', 'num_training_metrics', training_metrics) self._avg_over_dict(averages, 'results', 'num_results') self._avg_over_dict(averages, 'training_metrics', 'num_training_metrics') r[k] = averages else: r[k] = self._average(v) return r def _avg_over_dict(self, dict, column, count_column): for name in dict[column].keys(): dict[column][name] /= dict[count_column] def _sum_to_dict(self, dict, column, count_column, values): dict[count_column] += 1 for name,value in values.items(): if name not in dict[column]: dict[column][name] = 0 dict[column][name] += value def write_json(self, averages=True): """ Write a combined JSON Report """ write_json_file( os.path.join( const.RESULTS_DIR, "report" + ("" if averages else "_raw") + ".json" ), self.avg_results if averages else self.results ) print("==> " + ("Averaged " if averages else "") + "JSON written!") def _get_csv_rows(self, prefix, data): rows = [] for k,v in data.items(): if isinstance(v, dict): rows.extend(self._get_csv_rows(prefix + [k], v)) else: if k not in Results._IGNORE_KEYS: rows.append(prefix + [k, v]) return rows def write_csv(self): """ Write a CSV version of the report (to be read and plotted by R) """ # organize data csv = [] csv.append(self._get_key_list_names()) csv.extend(self._get_csv_rows([], self.avg_results)) # write file csv_file = "" for row in csv: csv_file += ','.join(map(lambda e: str(e), row)) + "\n" write_file( os.path.join( const.RESULTS_DIR, "report.csv" ), csv_file ) print("==> CSV written!")
Read the results of single train and evaluation loops and combine them.
See the source of core.results
to find out how to use this class.
View Source
def __init__(self): # read all files self.results = {} for f in os.listdir(const.RESULTS_DIR): if Results._FILE_PATTERN.match(f): self._load_results_file(f) # calculate averages self.avg_results = self._average(deepcopy(self.results))
View Source
def write_json(self, averages=True): """ Write a combined JSON Report """ write_json_file( os.path.join( const.RESULTS_DIR, "report" + ("" if averages else "_raw") + ".json" ), self.avg_results if averages else self.results ) print("==> " + ("Averaged " if averages else "") + "JSON written!")
Write a combined JSON Report
View Source
def write_csv(self): """ Write a CSV version of the report (to be read and plotted by R) """ # organize data csv = [] csv.append(self._get_key_list_names()) csv.extend(self._get_csv_rows([], self.avg_results)) # write file csv_file = "" for row in csv: csv_file += ','.join(map(lambda e: str(e), row)) + "\n" write_file( os.path.join( const.RESULTS_DIR, "report.csv" ), csv_file ) print("==> CSV written!")
Write a CSV version of the report (to be read and plotted by R)