core.evaluation.results

View Source
import os, re, time

from copy import deepcopy 
from functools import reduce

from core.model.exec import Executor
from core.utils import const, read_json_file, write_json_file, write_file, strip_all


class Results():
	"""
		Read the results of single train and evaluation loops
		and combine them.

		See the source of `core.results` to find out how to use this class.
	"""

	_FILE_PATTERN = re.compile("execution_(?:[A-Za-z]+-?)+_(\d\d\d\d-\d\d-\d\d_\d\d-\d\d-\d\d)\.json")

	_IGNORE_KEYS = [
		'num_training_metrics',
		'num_results'
	]

	def __init__(self):
		# read all files
		self.results = {}
		for f in os.listdir(const.RESULTS_DIR):
			if Results._FILE_PATTERN.match(f):
				self._load_results_file(f)
	
		# calculate averages
		self.avg_results = self._average(deepcopy(self.results))

	def _get_key_list(self, data):
		return [
			data['classes']['model'] + (
					(("-" + data['parameters']['scd_mapping']) if data['parameters']['scd_mapping'] != "none" else "") + 
					(("-" + str(round(data['parameters']['scd_threshold'], 2))) if data['parameters']['scd_threshold'] != "none" else "")
				),
			data['names']['base_model'],
			strip_all(data['names']['annotated_corpus_train']) + ( "-same-annotations" if not data['parameters']['split_annotator'] else "" ),
			strip_all(data['names']['annotated_corpus_eval']) + ( "-same-annotations" if not data['parameters']['split_annotator'] else "" ),
			str(data['percentages'][0]) + "-" + str(data['percentages'][1]),
			data['parameters']['bert_training']
		]

	def _get_key_list_names(self):
		return [
			"Model",
			"BaseModel",
			"TrainingData",
			"EvaluationData",
			"TrainTestRatio",
			"BERTParameter",
			"Source",
			"Type",
			"Value"
		]

	def _get_value_list(self, data):
		return data['results'], data['training_metrics']

	def _load_results_file(self, filename):
		data = read_json_file(os.path.join(
				const.RESULTS_DIR,
				filename
			))
		parts = Results._FILE_PATTERN.match(filename)
	
		current = self.results
		for key in self._get_key_list(data):
			if key not in current:
				current[key] = {}
			current = current[key]

		unixtime = int(time.mktime(time.strptime(parts.group(1), Executor.TIME_FORMAT)))
		current[unixtime] = self._get_value_list(data)

	def _average(self, r):
		for k,v in r.items():
			if reduce(lambda a,b: a and str(b).isnumeric(), v.keys(), True):
				# all keys are timestamps => do the averages!
				averages = {
					'results' : {},
					'num_results' : 0,
					'training_metrics' : {},
					'num_training_metrics' : 0
				}

				for results, training_metrics in v.values():
					self._sum_to_dict(averages, 'results', 'num_results', results)

					if training_metrics != None:
						self._sum_to_dict(averages, 'training_metrics', 'num_training_metrics', training_metrics)

				self._avg_over_dict(averages, 'results', 'num_results')
				self._avg_over_dict(averages, 'training_metrics', 'num_training_metrics')
				
				r[k] = averages
			else:
				r[k] = self._average(v)

		return r

	def _avg_over_dict(self, dict, column, count_column):
		for name in dict[column].keys():
			dict[column][name] /= dict[count_column]

	def _sum_to_dict(self, dict, column, count_column, values):
		dict[count_column] += 1
		for name,value in values.items():
			if name not in dict[column]:
				dict[column][name] = 0
			dict[column][name] += value

	def write_json(self, averages=True):
		"""
			Write a combined JSON Report
		"""
		write_json_file(
				os.path.join(
					const.RESULTS_DIR,
					"report" + ("" if averages else "_raw") + ".json"
				),
				self.avg_results if averages else self.results
			)
		print("==> " + ("Averaged " if averages else "") + "JSON written!")

	def _get_csv_rows(self, prefix, data):
		rows = []
		for k,v in data.items():
			if isinstance(v, dict):
				rows.extend(self._get_csv_rows(prefix + [k], v))
			else:
				if k not in Results._IGNORE_KEYS:
					rows.append(prefix + [k, v])
		return rows


	def write_csv(self):
		"""
			Write a CSV version of the report (to be read and plotted by R)
		"""
		# organize data
		csv = []
		csv.append(self._get_key_list_names())
		csv.extend(self._get_csv_rows([], self.avg_results))

		# write file
		csv_file = ""
		for row in csv:
			csv_file += ','.join(map(lambda e: str(e), row)) + "\n"
		write_file(
				os.path.join(
					const.RESULTS_DIR,
					"report.csv"
				),
				csv_file
			)
		print("==> CSV written!")
#   class Results:
View Source
class Results():
	"""
		Read the results of single train and evaluation loops
		and combine them.

		See the source of `core.results` to find out how to use this class.
	"""

	_FILE_PATTERN = re.compile("execution_(?:[A-Za-z]+-?)+_(\d\d\d\d-\d\d-\d\d_\d\d-\d\d-\d\d)\.json")

	_IGNORE_KEYS = [
		'num_training_metrics',
		'num_results'
	]

	def __init__(self):
		# read all files
		self.results = {}
		for f in os.listdir(const.RESULTS_DIR):
			if Results._FILE_PATTERN.match(f):
				self._load_results_file(f)
	
		# calculate averages
		self.avg_results = self._average(deepcopy(self.results))

	def _get_key_list(self, data):
		return [
			data['classes']['model'] + (
					(("-" + data['parameters']['scd_mapping']) if data['parameters']['scd_mapping'] != "none" else "") + 
					(("-" + str(round(data['parameters']['scd_threshold'], 2))) if data['parameters']['scd_threshold'] != "none" else "")
				),
			data['names']['base_model'],
			strip_all(data['names']['annotated_corpus_train']) + ( "-same-annotations" if not data['parameters']['split_annotator'] else "" ),
			strip_all(data['names']['annotated_corpus_eval']) + ( "-same-annotations" if not data['parameters']['split_annotator'] else "" ),
			str(data['percentages'][0]) + "-" + str(data['percentages'][1]),
			data['parameters']['bert_training']
		]

	def _get_key_list_names(self):
		return [
			"Model",
			"BaseModel",
			"TrainingData",
			"EvaluationData",
			"TrainTestRatio",
			"BERTParameter",
			"Source",
			"Type",
			"Value"
		]

	def _get_value_list(self, data):
		return data['results'], data['training_metrics']

	def _load_results_file(self, filename):
		data = read_json_file(os.path.join(
				const.RESULTS_DIR,
				filename
			))
		parts = Results._FILE_PATTERN.match(filename)
	
		current = self.results
		for key in self._get_key_list(data):
			if key not in current:
				current[key] = {}
			current = current[key]

		unixtime = int(time.mktime(time.strptime(parts.group(1), Executor.TIME_FORMAT)))
		current[unixtime] = self._get_value_list(data)

	def _average(self, r):
		for k,v in r.items():
			if reduce(lambda a,b: a and str(b).isnumeric(), v.keys(), True):
				# all keys are timestamps => do the averages!
				averages = {
					'results' : {},
					'num_results' : 0,
					'training_metrics' : {},
					'num_training_metrics' : 0
				}

				for results, training_metrics in v.values():
					self._sum_to_dict(averages, 'results', 'num_results', results)

					if training_metrics != None:
						self._sum_to_dict(averages, 'training_metrics', 'num_training_metrics', training_metrics)

				self._avg_over_dict(averages, 'results', 'num_results')
				self._avg_over_dict(averages, 'training_metrics', 'num_training_metrics')
				
				r[k] = averages
			else:
				r[k] = self._average(v)

		return r

	def _avg_over_dict(self, dict, column, count_column):
		for name in dict[column].keys():
			dict[column][name] /= dict[count_column]

	def _sum_to_dict(self, dict, column, count_column, values):
		dict[count_column] += 1
		for name,value in values.items():
			if name not in dict[column]:
				dict[column][name] = 0
			dict[column][name] += value

	def write_json(self, averages=True):
		"""
			Write a combined JSON Report
		"""
		write_json_file(
				os.path.join(
					const.RESULTS_DIR,
					"report" + ("" if averages else "_raw") + ".json"
				),
				self.avg_results if averages else self.results
			)
		print("==> " + ("Averaged " if averages else "") + "JSON written!")

	def _get_csv_rows(self, prefix, data):
		rows = []
		for k,v in data.items():
			if isinstance(v, dict):
				rows.extend(self._get_csv_rows(prefix + [k], v))
			else:
				if k not in Results._IGNORE_KEYS:
					rows.append(prefix + [k, v])
		return rows


	def write_csv(self):
		"""
			Write a CSV version of the report (to be read and plotted by R)
		"""
		# organize data
		csv = []
		csv.append(self._get_key_list_names())
		csv.extend(self._get_csv_rows([], self.avg_results))

		# write file
		csv_file = ""
		for row in csv:
			csv_file += ','.join(map(lambda e: str(e), row)) + "\n"
		write_file(
				os.path.join(
					const.RESULTS_DIR,
					"report.csv"
				),
				csv_file
			)
		print("==> CSV written!")

Read the results of single train and evaluation loops and combine them.

See the source of core.results to find out how to use this class.

#   Results()
View Source
	def __init__(self):
		# read all files
		self.results = {}
		for f in os.listdir(const.RESULTS_DIR):
			if Results._FILE_PATTERN.match(f):
				self._load_results_file(f)
	
		# calculate averages
		self.avg_results = self._average(deepcopy(self.results))
#   def write_json(self, averages=True):
View Source
	def write_json(self, averages=True):
		"""
			Write a combined JSON Report
		"""
		write_json_file(
				os.path.join(
					const.RESULTS_DIR,
					"report" + ("" if averages else "_raw") + ".json"
				),
				self.avg_results if averages else self.results
			)
		print("==> " + ("Averaged " if averages else "") + "JSON written!")

Write a combined JSON Report

#   def write_csv(self):
View Source
	def write_csv(self):
		"""
			Write a CSV version of the report (to be read and plotted by R)
		"""
		# organize data
		csv = []
		csv.append(self._get_key_list_names())
		csv.extend(self._get_csv_rows([], self.avg_results))

		# write file
		csv_file = ""
		for row in csv:
			csv_file += ','.join(map(lambda e: str(e), row)) + "\n"
		write_file(
				os.path.join(
					const.RESULTS_DIR,
					"report.csv"
				),
				csv_file
			)
		print("==> CSV written!")

Write a CSV version of the report (to be read and plotted by R)