core.model.scdmatrix.models

View Source
import os, warnings

import numpy as np
from tqdm import tqdm

warnings.filterwarnings(action='ignore', category=UserWarning) # some annoying warning
from gensim.models.doc2vec import TaggedDocument, Doc2Vec
warnings.filterwarnings(action='default', category=UserWarning) # reset defaults

from core.corpus import DefaultPreprocessor
from core.model.scdmatrix.model import SCDMatrix
from core.utils import write_json_file, read_json_file, calc_metrics, GeneratorToIterator, Random, CacheName

class iSCDMatrix(SCDMatrix):
	'''
		Represents a model detecting inline SCDs (iSCDs) via a trained SCD matrix.  
		See [Identifying Subjective Content Descriptions among Text](http://ifis.uni-luebeck.de/uploads/tx_wapublications/identifying_scds_among_texts_public.pdf) for more information.

		**This model does not use a GPU and will never. The calculation of SCD similarity values via `_get_scds` (used
		by e.g. `train()` and `evaluate()` ) uses multiple cores per default.**
	'''

	def __init__(self,
			annotated_corpus_train, annotated_corpus_eval,
			manual_threshold=None,
			**kwargs
		):
		"""
			Args:
				manual_threshold (float): Define a threshold to use instead of self estimate one (if `None`)
		"""
		if manual_threshold != None and (manual_threshold < 0.01 or manual_threshold > 0.99):
			raise AttributeError("The manual_threshold needs to be a float in [0.01, 0.99] or None to self estimate!")
		self.manual_threshold = manual_threshold

		super().__init__(annotated_corpus_train, annotated_corpus_eval, **kwargs)

	def _get_scds_generator_train(self):
		for sentences, is_scds in self.annotated_corpus_train.iterate_inline_scd_texts():
			for sentence, is_scd in zip(sentences, is_scds):
				if is_scd:
					yield sentence, True

	def _init_subclass(self):
		if self.manual_threshold == None:
			cache_filename = CacheName.filename(self.cache_name + "_special.json")
			if self.ignore_cache or not os.path.isfile(cache_filename):
				print("Training Step 4/4:")
				with tqdm(total=self.annotated_corpus_train.get_num_sentences()) as timeline:
					scd_sims = self._get_scds(self._get_scds_generator_train, timeline=timeline)

				scd_sims = np.array(scd_sims)
				percentiles = np.percentile(scd_sims[:, 0], [i * 5 for i in range(21)])

				self.threshold = percentiles[14] # means perc. 70

				if self.write_model:
					write_json_file(
						cache_filename, {
							'percentiles' : percentiles.tolist(),
							'threshold' : self.threshold
						})
			else:
				data = read_json_file(cache_filename)
				self.threshold = data['threshold']
		else:
			print("Training Step 4/4: Skipped cause manual_threshold defined")
			self.threshold = self.manual_threshold

	def _get_scds_generator_evaluate(self):
		for sentences, is_scds in self.annotated_corpus_eval.iterate_inline_scd_texts():
			for sentence, is_scd in zip(sentences, is_scds):
				yield sentence, is_scd

	def _evaluate(self):
		# do the prediction
		print("Evaluate Step 1/1:")
		with tqdm(total=self.annotated_corpus_eval.get_num_sentences()*2) as timeline:
			scd_sims = self._get_scds(self._get_scds_generator_evaluate, timeline=timeline)
		
		scd_sims = np.array(scd_sims) # to numpy and select columns
		labels = scd_sims[:, 2]
		predictions = scd_sims[:, 0]
		# use threshold and make "0", "1"
		predictions = np.where(predictions < self.threshold, 1, 0)

		return calc_metrics(labels, predictions)

	def _predict(self, sentence):
		try:
			words = self.preprocessor.preprocess_words(sentence)
		except:
			self.preprocessor = DefaultPreprocessor()
			words = self.preprocessor.preprocess_words(sentence)

		s, i = self._get_scd(words)
		is_scd = s < self.threshold

		return is_scd, "Seems to be " + ("a" if is_scd else "no" ) + " scd!", "Would predict as SCD: '" + ' '.join(self._get_scd_text(i)) + "'"

class MPSCDMatrix(SCDMatrix):
	'''
		Represents a model predicting most probable SCDs (MPSCDs) via a trained SCD matrix.  
		See [Augmenting and Automating Corpus Enrichment](http://ifis.uni-luebeck.de/uploads/tx_wapublications/ws-ijsc_public.pdf) for more information.

		**This model does not use a GPU and will never. The calculation of SCD similarity values via `_get_scds` (used
		by e.g. `train()` and `evaluate()` ) uses multiple cores per default.**
	'''

	NUM_CHOICES = 4
	'''
		Defines the number of SCDs the system has to choose one matching from.
	'''

	def __init__(self,
			annotated_corpus_train, annotated_corpus_eval,
			scd_mapping='d2v',
			**kwargs
		):
		"""
			Args:
				scd_mapping (str of 'd2v', 'ia'): How to map scd given multiple choice scds to predicted scd by mode.
					May use Doc2Vec 'd2v' or an inverse annotator 'ia' (if supported by used annotator)
		"""
		if scd_mapping not in ('d2v', 'ia'):
			raise AttributeError("The scd_mapping needs to be one of 'd2v', 'ia'!")
		self.scd_mapping = scd_mapping

		super().__init__(annotated_corpus_train, annotated_corpus_eval, **kwargs)

	def _tagged_documents_generator_train(self):
		for _, scds in self.annotated_corpus_train.iterate_sentence_scds(n=self.num_scds_train):
			for scd in scds:
				scd_id = self.scd_map[str(tuple(self.dict.doc2idx(scd)))]
				yield TaggedDocument(scd, [scd_id])


	def _init_subclass(self):
		if self.scd_mapping == 'd2v':
			cache_filename = CacheName.filename(self.cache_name + ".d2v")
			if self.ignore_cache or not os.path.isfile(cache_filename):

				print("Training Step 4/4:")
				self.doc2vec_model = Doc2Vec(
						GeneratorToIterator(self._tagged_documents_generator_train),
						workers=self.num_processes
					)

				if self.write_model:
					self.doc2vec_model.save(cache_filename)
			else:
				self.doc2vec_model = Doc2Vec.load(cache_filename)
		else:
			print("Training Step 4/4: Skipped cause using inverse annotator")
			self.local_random = Random.get_generator()

	def _sentence_vector(self, sentence):
		return self.doc2vec_model.infer_vector(sentence)

	def _get_scds_generator_evaluate(self):
		for i, (sentence, _, _) in enumerate(self.annotated_corpus_eval.iterate_sentence_scds_non_scds(matching_n=1, non_matching_n=MPSCDMatrix.NUM_CHOICES-1)):
			yield sentence, i

	def _evaluate(self):
		# select the "best" scd each
		print("Evaluate Step 1/2:")
		with tqdm(total=self.annotated_corpus_eval.get_num_sentences()) as timeline:
			best_scds = self._get_scds(self._get_scds_generator_evaluate, timeline=timeline)
		
		best_scds = np.array(best_scds) # to numpy 
		best_scds = best_scds[best_scds[:,2].argsort()] # sort by 'i' as index of sentence when iterating via generator

		print("Evaluate Step 2/2:")
		with tqdm(total=self.annotated_corpus_eval.get_num_sentences()) as timeline:

			count_correct, count_all = 0, 0
			# the generator is stable!
			for i, (sentence, scds, non_scds) in enumerate(self.annotated_corpus_eval.iterate_sentence_scds_non_scds(matching_n=1, non_matching_n=MPSCDMatrix.NUM_CHOICES-1)):
				if len(scds) == 1 and len(non_scds) > 0:
					predicted_scd_id = int(best_scds[i,1])
					if self._single_prediction(sentence, predicted_scd_id, scds[0], non_scds):
						count_correct += 1 
					
					count_all += 1 

				timeline.update(1)
		return {
			'accuracy' : count_correct / count_all,
			'num' : count_all
		}

	def _single_prediction(self, sentence, predicted_scd_id, correct_scd, non_scds, return_more=False):
		"""
			Does a single multiple choice prediction, i.e.:

			Transforms the predicted scd by the matrix into a Doc2Vec space, also
			all possible choices are transformed into the same space. Then chooses 
			the scd with the most similar vector in the space from merge(correct_scd, non_scds).

			Args:
				sentence (list of str): Sentence to select the scd for
				predicted_scd_id (int): The id of the scd predicted by the matrix for the sentence
				correct_scd (list of str): The correct scd of the list of scds to choose from
				non_scds (list of list of str): The wrong scds of the list of scds to choose from
			Returns:
				Boolean if model chose correctly if `return_more=False`; else  
				Boolean if model chose correctly, chosen_scd (list of str), predicted_scd (list of str)
		"""
		predicted_scd = self._get_scd_text(predicted_scd_id)

		if self.scd_mapping == 'd2v':
			predicted_vector = self._sentence_vector(predicted_scd)
			choice_vectors = np.ndarray(shape=(len(non_scds)+1, len(predicted_vector)))
			choice_vectors[0,:] = self._sentence_vector(correct_scd)
			choice_vectors[1:,:] = [self._sentence_vector(n_s) for n_s in non_scds] 

			# cosine similarity
			dot_products = np.linalg.multi_dot([choice_vectors, predicted_vector])
			dividers = np.linalg.norm(choice_vectors, axis=-1) * np.linalg.norm(predicted_vector)
			similarities = np.divide(
				dot_products, dividers,
				out=np.zeros_like(dot_products),
				where=(dividers!=0)
			)

			predicted = np.argmax(similarities)
		
		else:
			if self.annotated_corpus_eval.inverse_annotator_is_similar_annotation(sentence, correct_scd, predicted_scd):
				predicted = 0
			else:
				predicted = self.local_random.randrange(len(non_scds)) + 1 # a wrong scd

		if return_more:
			return predicted == 0, \
				correct_scd if predicted == 0 else non_scds[predicted-1], \
				predicted_scd
		else:
			return predicted == 0

	def _predict(self, sentence, choices):
		try:
			words = self.preprocessor.preprocess_words(sentence)
		except:
			self.preprocessor = DefaultPreprocessor()
			words = self.preprocessor.preprocess_words(sentence)

		choices_words = [self.preprocessor.preprocess_words(c) for c in choices]

		_, predicted_scd_id = self._get_scd(words)
		_, chosen_scd, predicted_scd = self._single_prediction(sentence, predicted_scd_id, choices_words[0], choices_words[1:], return_more=True)

		c_id = choices_words.index(chosen_scd)

		return c_id, \
			"Text: '" + sentence + "'; SCD: '" + choices[c_id] + "'", \
			"Would predict as SCD: '" + ' '.join(predicted_scd) + "'"
View Source
class iSCDMatrix(SCDMatrix):
	'''
		Represents a model detecting inline SCDs (iSCDs) via a trained SCD matrix.  
		See [Identifying Subjective Content Descriptions among Text](http://ifis.uni-luebeck.de/uploads/tx_wapublications/identifying_scds_among_texts_public.pdf) for more information.

		**This model does not use a GPU and will never. The calculation of SCD similarity values via `_get_scds` (used
		by e.g. `train()` and `evaluate()` ) uses multiple cores per default.**
	'''

	def __init__(self,
			annotated_corpus_train, annotated_corpus_eval,
			manual_threshold=None,
			**kwargs
		):
		"""
			Args:
				manual_threshold (float): Define a threshold to use instead of self estimate one (if `None`)
		"""
		if manual_threshold != None and (manual_threshold < 0.01 or manual_threshold > 0.99):
			raise AttributeError("The manual_threshold needs to be a float in [0.01, 0.99] or None to self estimate!")
		self.manual_threshold = manual_threshold

		super().__init__(annotated_corpus_train, annotated_corpus_eval, **kwargs)

	def _get_scds_generator_train(self):
		for sentences, is_scds in self.annotated_corpus_train.iterate_inline_scd_texts():
			for sentence, is_scd in zip(sentences, is_scds):
				if is_scd:
					yield sentence, True

	def _init_subclass(self):
		if self.manual_threshold == None:
			cache_filename = CacheName.filename(self.cache_name + "_special.json")
			if self.ignore_cache or not os.path.isfile(cache_filename):
				print("Training Step 4/4:")
				with tqdm(total=self.annotated_corpus_train.get_num_sentences()) as timeline:
					scd_sims = self._get_scds(self._get_scds_generator_train, timeline=timeline)

				scd_sims = np.array(scd_sims)
				percentiles = np.percentile(scd_sims[:, 0], [i * 5 for i in range(21)])

				self.threshold = percentiles[14] # means perc. 70

				if self.write_model:
					write_json_file(
						cache_filename, {
							'percentiles' : percentiles.tolist(),
							'threshold' : self.threshold
						})
			else:
				data = read_json_file(cache_filename)
				self.threshold = data['threshold']
		else:
			print("Training Step 4/4: Skipped cause manual_threshold defined")
			self.threshold = self.manual_threshold

	def _get_scds_generator_evaluate(self):
		for sentences, is_scds in self.annotated_corpus_eval.iterate_inline_scd_texts():
			for sentence, is_scd in zip(sentences, is_scds):
				yield sentence, is_scd

	def _evaluate(self):
		# do the prediction
		print("Evaluate Step 1/1:")
		with tqdm(total=self.annotated_corpus_eval.get_num_sentences()*2) as timeline:
			scd_sims = self._get_scds(self._get_scds_generator_evaluate, timeline=timeline)
		
		scd_sims = np.array(scd_sims) # to numpy and select columns
		labels = scd_sims[:, 2]
		predictions = scd_sims[:, 0]
		# use threshold and make "0", "1"
		predictions = np.where(predictions < self.threshold, 1, 0)

		return calc_metrics(labels, predictions)

	def _predict(self, sentence):
		try:
			words = self.preprocessor.preprocess_words(sentence)
		except:
			self.preprocessor = DefaultPreprocessor()
			words = self.preprocessor.preprocess_words(sentence)

		s, i = self._get_scd(words)
		is_scd = s < self.threshold

		return is_scd, "Seems to be " + ("a" if is_scd else "no" ) + " scd!", "Would predict as SCD: '" + ' '.join(self._get_scd_text(i)) + "'"

Represents a model detecting inline SCDs (iSCDs) via a trained SCD matrix.
See Identifying Subjective Content Descriptions among Text for more information.

This model does not use a GPU and will never. The calculation of SCD similarity values via _get_scds (used by e.g. train() and evaluate() ) uses multiple cores per default.

#   iSCDMatrix( annotated_corpus_train, annotated_corpus_eval, manual_threshold=None, **kwargs )
View Source
	def __init__(self,
			annotated_corpus_train, annotated_corpus_eval,
			manual_threshold=None,
			**kwargs
		):
		"""
			Args:
				manual_threshold (float): Define a threshold to use instead of self estimate one (if `None`)
		"""
		if manual_threshold != None and (manual_threshold < 0.01 or manual_threshold > 0.99):
			raise AttributeError("The manual_threshold needs to be a float in [0.01, 0.99] or None to self estimate!")
		self.manual_threshold = manual_threshold

		super().__init__(annotated_corpus_train, annotated_corpus_eval, **kwargs)
Args
  • manual_threshold (float): Define a threshold to use instead of self estimate one (if None)
View Source
class MPSCDMatrix(SCDMatrix):
	'''
		Represents a model predicting most probable SCDs (MPSCDs) via a trained SCD matrix.  
		See [Augmenting and Automating Corpus Enrichment](http://ifis.uni-luebeck.de/uploads/tx_wapublications/ws-ijsc_public.pdf) for more information.

		**This model does not use a GPU and will never. The calculation of SCD similarity values via `_get_scds` (used
		by e.g. `train()` and `evaluate()` ) uses multiple cores per default.**
	'''

	NUM_CHOICES = 4
	'''
		Defines the number of SCDs the system has to choose one matching from.
	'''

	def __init__(self,
			annotated_corpus_train, annotated_corpus_eval,
			scd_mapping='d2v',
			**kwargs
		):
		"""
			Args:
				scd_mapping (str of 'd2v', 'ia'): How to map scd given multiple choice scds to predicted scd by mode.
					May use Doc2Vec 'd2v' or an inverse annotator 'ia' (if supported by used annotator)
		"""
		if scd_mapping not in ('d2v', 'ia'):
			raise AttributeError("The scd_mapping needs to be one of 'd2v', 'ia'!")
		self.scd_mapping = scd_mapping

		super().__init__(annotated_corpus_train, annotated_corpus_eval, **kwargs)

	def _tagged_documents_generator_train(self):
		for _, scds in self.annotated_corpus_train.iterate_sentence_scds(n=self.num_scds_train):
			for scd in scds:
				scd_id = self.scd_map[str(tuple(self.dict.doc2idx(scd)))]
				yield TaggedDocument(scd, [scd_id])


	def _init_subclass(self):
		if self.scd_mapping == 'd2v':
			cache_filename = CacheName.filename(self.cache_name + ".d2v")
			if self.ignore_cache or not os.path.isfile(cache_filename):

				print("Training Step 4/4:")
				self.doc2vec_model = Doc2Vec(
						GeneratorToIterator(self._tagged_documents_generator_train),
						workers=self.num_processes
					)

				if self.write_model:
					self.doc2vec_model.save(cache_filename)
			else:
				self.doc2vec_model = Doc2Vec.load(cache_filename)
		else:
			print("Training Step 4/4: Skipped cause using inverse annotator")
			self.local_random = Random.get_generator()

	def _sentence_vector(self, sentence):
		return self.doc2vec_model.infer_vector(sentence)

	def _get_scds_generator_evaluate(self):
		for i, (sentence, _, _) in enumerate(self.annotated_corpus_eval.iterate_sentence_scds_non_scds(matching_n=1, non_matching_n=MPSCDMatrix.NUM_CHOICES-1)):
			yield sentence, i

	def _evaluate(self):
		# select the "best" scd each
		print("Evaluate Step 1/2:")
		with tqdm(total=self.annotated_corpus_eval.get_num_sentences()) as timeline:
			best_scds = self._get_scds(self._get_scds_generator_evaluate, timeline=timeline)
		
		best_scds = np.array(best_scds) # to numpy 
		best_scds = best_scds[best_scds[:,2].argsort()] # sort by 'i' as index of sentence when iterating via generator

		print("Evaluate Step 2/2:")
		with tqdm(total=self.annotated_corpus_eval.get_num_sentences()) as timeline:

			count_correct, count_all = 0, 0
			# the generator is stable!
			for i, (sentence, scds, non_scds) in enumerate(self.annotated_corpus_eval.iterate_sentence_scds_non_scds(matching_n=1, non_matching_n=MPSCDMatrix.NUM_CHOICES-1)):
				if len(scds) == 1 and len(non_scds) > 0:
					predicted_scd_id = int(best_scds[i,1])
					if self._single_prediction(sentence, predicted_scd_id, scds[0], non_scds):
						count_correct += 1 
					
					count_all += 1 

				timeline.update(1)
		return {
			'accuracy' : count_correct / count_all,
			'num' : count_all
		}

	def _single_prediction(self, sentence, predicted_scd_id, correct_scd, non_scds, return_more=False):
		"""
			Does a single multiple choice prediction, i.e.:

			Transforms the predicted scd by the matrix into a Doc2Vec space, also
			all possible choices are transformed into the same space. Then chooses 
			the scd with the most similar vector in the space from merge(correct_scd, non_scds).

			Args:
				sentence (list of str): Sentence to select the scd for
				predicted_scd_id (int): The id of the scd predicted by the matrix for the sentence
				correct_scd (list of str): The correct scd of the list of scds to choose from
				non_scds (list of list of str): The wrong scds of the list of scds to choose from
			Returns:
				Boolean if model chose correctly if `return_more=False`; else  
				Boolean if model chose correctly, chosen_scd (list of str), predicted_scd (list of str)
		"""
		predicted_scd = self._get_scd_text(predicted_scd_id)

		if self.scd_mapping == 'd2v':
			predicted_vector = self._sentence_vector(predicted_scd)
			choice_vectors = np.ndarray(shape=(len(non_scds)+1, len(predicted_vector)))
			choice_vectors[0,:] = self._sentence_vector(correct_scd)
			choice_vectors[1:,:] = [self._sentence_vector(n_s) for n_s in non_scds] 

			# cosine similarity
			dot_products = np.linalg.multi_dot([choice_vectors, predicted_vector])
			dividers = np.linalg.norm(choice_vectors, axis=-1) * np.linalg.norm(predicted_vector)
			similarities = np.divide(
				dot_products, dividers,
				out=np.zeros_like(dot_products),
				where=(dividers!=0)
			)

			predicted = np.argmax(similarities)
		
		else:
			if self.annotated_corpus_eval.inverse_annotator_is_similar_annotation(sentence, correct_scd, predicted_scd):
				predicted = 0
			else:
				predicted = self.local_random.randrange(len(non_scds)) + 1 # a wrong scd

		if return_more:
			return predicted == 0, \
				correct_scd if predicted == 0 else non_scds[predicted-1], \
				predicted_scd
		else:
			return predicted == 0

	def _predict(self, sentence, choices):
		try:
			words = self.preprocessor.preprocess_words(sentence)
		except:
			self.preprocessor = DefaultPreprocessor()
			words = self.preprocessor.preprocess_words(sentence)

		choices_words = [self.preprocessor.preprocess_words(c) for c in choices]

		_, predicted_scd_id = self._get_scd(words)
		_, chosen_scd, predicted_scd = self._single_prediction(sentence, predicted_scd_id, choices_words[0], choices_words[1:], return_more=True)

		c_id = choices_words.index(chosen_scd)

		return c_id, \
			"Text: '" + sentence + "'; SCD: '" + choices[c_id] + "'", \
			"Would predict as SCD: '" + ' '.join(predicted_scd) + "'"

Represents a model predicting most probable SCDs (MPSCDs) via a trained SCD matrix.
See Augmenting and Automating Corpus Enrichment for more information.

This model does not use a GPU and will never. The calculation of SCD similarity values via _get_scds (used by e.g. train() and evaluate() ) uses multiple cores per default.

#   MPSCDMatrix( annotated_corpus_train, annotated_corpus_eval, scd_mapping='d2v', **kwargs )
View Source
	def __init__(self,
			annotated_corpus_train, annotated_corpus_eval,
			scd_mapping='d2v',
			**kwargs
		):
		"""
			Args:
				scd_mapping (str of 'd2v', 'ia'): How to map scd given multiple choice scds to predicted scd by mode.
					May use Doc2Vec 'd2v' or an inverse annotator 'ia' (if supported by used annotator)
		"""
		if scd_mapping not in ('d2v', 'ia'):
			raise AttributeError("The scd_mapping needs to be one of 'd2v', 'ia'!")
		self.scd_mapping = scd_mapping

		super().__init__(annotated_corpus_train, annotated_corpus_eval, **kwargs)
Args
  • scd_mapping (str of 'd2v', 'ia'): How to map scd given multiple choice scds to predicted scd by mode. May use Doc2Vec 'd2v' or an inverse annotator 'ia' (if supported by used annotator)
#   NUM_CHOICES = 4

Defines the number of SCDs the system has to choose one matching from.