core.corpus.annotators

View Source
import os, json, re, math, csv

from nltk.util import everygrams

from core.corpus.annotator import Annotator, InverseAnnotator
import core.utils.const as const

class Wiktionary(Annotator):
	"""
		Annotator using definitions from wiktionary dictionary.

		Dataset source: <https://en.wiktionary.org/>
	"""

	DATAFILE = os.path.join(const.DATASET_DIR, "wiktionary", "data.json")

	def __init__(self, **kwargs):
		super().__init__(**kwargs)

		if self._is_cached(suffix='map'):
			self.map = self._get_cached(suffix='map')
		else:
			self._create_map()

		self._shuffle_annotations()
		self.max_gram = max([int(k) for k in self.map.keys()])

	def _create_map(self):
		self.map = {}
		with open(Wiktionary.DATAFILE, "r", errors='ignore') as f:
			for line in f:
				line = json.loads(line)

				annotation_text = re.sub('<[^<]+?>', '', line['annotation'][0]) # remove tags (from wiktionary)
				annotations = self.preprocessor.preprocess_document(annotation_text)
				if len(annotations) > 1: # last sentence is often not good, when there are multiple
					annotations = annotations[:-1]

				for annotation in annotations: # each sentence as one poss. annotation
					if len(annotation) >= 4:
						if annotation[0] == '#':
							annotation = annotation[1:]
						key = self.preprocessor.preprocess_words(line['key'].lower())
						if len(key) > 0:
							num_words = str(len(key))
							
							if num_words not in self.map:
								self.map[num_words] = {}

							key = '-'.join(key)

							if key not in self.map[num_words]:
								self.map[num_words][key] = []

							self.map[num_words][key].append(annotation)

		self._set_cached(self.map, suffix='map')

	def _shuffle_annotations(self):
		anno_count = 0
		# for each length (unigram, bigram, ...)
		for num_words in self.map:

			single_keys, del_keys = [], []
			# iterate over each key
			for key in self.map[num_words]:
				self.random.shuffle(self.map[num_words][key]) # to randomize selected annotations later

				if self.use_percentages: # splitted annotator?
					cur_len = len(self.map[num_words][key])
					if cur_len >= self.len_percentages: # at least one annotation per percentage possible? -> split!
						start = math.ceil(self.part_start * cur_len)
						end = start + math.ceil(self.part_len * cur_len)

						self.map[num_words][key] = self.map[num_words][key][start:end]

						# remove if empty
						if len(self.map[num_words][key]) < 1:
							del_keys.append(key)

					else: # store for later
						single_keys.append(key)	

				anno_count += len(self.map[num_words][key])

			# delete key marked as empty
			for key in del_keys:
				del self.map[num_words][key]

			if self.use_percentages: # delete "number of percentages" from single key each
				cur_len = len(single_keys)
				start = math.ceil(self.part_start * cur_len)
				end = start + math.ceil(self.part_len * cur_len)

				for key in single_keys[:start] + single_keys[end:]:
					del self.map[num_words][key]
					anno_count += -1

		if anno_count < 40000: # maximum of 293 296 annotations if no splitting used
			print("======================")
			print("Very small annotater created, contains only", anno_count, "annotations!")
			print("======================")

		self.shuffled_keys = list(self.map["1"].keys())
		self.random.shuffle(self.shuffled_keys)

	def _get_annotations(self, sentence, n):
		if len(sentence) <= 0: # no words?
			return []
		if n == -1: # "all" annotations?
			n = 1000

		sentence = list(map(lambda w: w.lower(), sentence)) # all map keys are "lowercase", should here, too

		annotations = []	
		for gram in everygrams(sentence, min_len=2, max_len=self.max_gram):
			grl = str(len(gram))
			if grl in self.map:
				key = '-'.join(gram)
				if key in self.map[grl]:
					annotations.extend(self.map[grl][key][:n])
		
		for word in sentence:
			if word in self.map["1"]:
				annotations.extend(self.map["1"][word][:n])

		l_an = len(annotations)
		if l_an+1 <= n:
			return annotations[:n]
		else:
			return [annotations[(i + i*n + self.random_seed) % l_an] for i in range(n)]
		
	def _get_non_annotations(self, sentence, n):
		if n == -1: # "all" annotations?
			n = 1000

		sentence = list(map(lambda w: w.lower(), sentence)) # all map keys are "lowercase", should here, too

		# randomize starting index
		s_index = self.random_seed % len(sentence)
		if sentence[s_index] in self.shuffled_keys:
			k_index = self.shuffled_keys.index(sentence[s_index]) - 1
			if k_index < 0:
				k_index = len(self.shuffled_keys) - 1
		else:
			k_index = 0

		annotations = []
		while len(annotations) < n:

			if self.shuffled_keys[k_index] not in sentence:
				annotations.append(self.map["1"][self.shuffled_keys[k_index]][0])

			k_index = (k_index + len(sentence)) % len(self.shuffled_keys)

		return annotations
		
	def _get_cachename(self):
		return "wiktionary-" + self.preprocessor_name + "-"

	def get_inverse_annotator(self):
		return InverseWiktionary(self.preprocessor)

class InverseWiktionary(InverseAnnotator):

	ANNOTATOR_CLASS = Wiktionary
	
	def _init(self, cache_data):
		self.inverse_map = cache_data

	def _create_cache_data(self):
		cache_data = {}
		# for each length (unigram, bigram, ...)
		for num_words in self.annotator.map:
			for word, scds in self.annotator.map[num_words].items():
				if '-' in word:
					words = word.split('-')
				else:
					words = [word]
				for scd in scds:
					scd_key = str(tuple(scd))
					if scd_key not in cache_data:
						cache_data[scd_key] = []
					cache_data[scd_key].extend(words)

		return cache_data

	def is_annotation(self, sentence, annotation):
		scd_key = str(tuple(annotation))
		if scd_key in self.inverse_map:
			words = self.inverse_map[scd_key]
			sentence = list(map(lambda w: w.lower(), sentence)) # all map keys are "lowercase", should here, too
			return len(set(sentence) & set(words)) >= 1
		else:
			return False # unknown annotation

class Quotes(Annotator):
	"""
		Annotator using quotes from the quotes dataset.

		Dataset source: <https://github.com/ShivaliGoel/Quotes-500K>
	"""

	DATAFILE = os.path.join(const.DATASET_DIR, "quotes", "data.csv")

	def __init__(self, **kwargs):
		super().__init__(**kwargs)

		if self._is_cached(suffix='map') and self._is_cached(suffix='texts'):
			self.map = self._get_cached(suffix='map')
			self.texts = self._get_cached(suffix='texts')
		else:
			self._create_map_texts()

		self._shuffle_annotations()

	def _create_map_texts(self):
		self.map = {}
		self.texts = []
		with open(Quotes.DATAFILE, "r", errors='ignore') as f:
			count = 0
			for line in csv.reader(f):
				annotation_texts = self.preprocessor.preprocess_document(line[0])
				annotation_keywords = self.preprocessor.preprocess_words(line[2])
				
				if sum([len(at) for at in annotation_texts]) > 4:
					added = False
					for annotation_keyword in annotation_keywords:
						annotation_keyword = annotation_keyword.lower()

						if len(annotation_keyword) > 1:
							if annotation_keyword not in self.map:
								self.map[annotation_keyword] = []
							self.map[annotation_keyword].append(count)
							added = True

					if added:
						self.texts.append(annotation_texts)
						count += 1	

		self._set_cached(self.map, suffix='map')
		self._set_cached(self.texts, suffix='texts')

	def _shuffle_annotations(self):
		anno_count = 0
	
		single_keys, del_keys = [], []
		# iterate over each key
		for key in self.map:
			self.random.shuffle(self.map[key]) # to randomize selected annotations later

			if self.use_percentages: # splitted annotator?
				cur_len = len(self.map[key])
				if cur_len >= self.len_percentages: # at least one annotation per percentage possible? -> split!
					start = math.ceil(self.part_start * cur_len)
					end = start + math.ceil(self.part_len * cur_len)

					self.map[key] = self.map[key][start:end]

					# remove if empty
					if len(self.map[key]) < 1:
						del_keys.append(key)

				else: # store for later
					single_keys.append(key)	

			anno_count += len(self.map[key])

		# delete key marked as empty
		for key in del_keys:
			del self.map[key]

		if self.use_percentages: # delete "number of percentages" from single key each
			cur_len = len(single_keys)
			start = math.ceil(self.part_start * cur_len)
			end = start + math.ceil(self.part_len * cur_len)

			for key in single_keys[:start] + single_keys[end:]:
				del self.map[key]
				anno_count += -1

		if anno_count < 40000: # maximum of > 500 000 annotations if no splitting used
			print("======================")
			print("Very small annotater created, contains only", anno_count, "annotations!")
			print("======================")

		self.shuffled_keys = list(self.map.keys())
		self.random.shuffle(self.shuffled_keys)

	def _get_annotations(self, sentence, n):
		if len(sentence) <= 0: # no words?
			return []
		if n == -1: # "all" annotations?
			n = 1000

		sentence = list(map(lambda w: w.lower(), sentence)) # all map keys are "lowercase", should here, too

		annotations = []	
		for word in sentence:
			if word in self.map:
				annotations.extend(self.map[word][:n])

		l_an = len(annotations)
		if l_an+1 <= n:
			anno_ids = annotations[:n]
		else:
			anno_ids = [annotations[(i + i*n + self.random_seed) % l_an] for i in range(n)]

		return self._annotation_id2text(anno_ids)
		
	def _annotation_id2text(self, anno_ids, only_flatten = False):
		if only_flatten:
			anno_texts = anno_ids
		else:
			anno_texts = [self.texts[int(anno_id)] for anno_id in anno_ids] # convert back to text from id
		return [ [word for sentence in anno for word in sentence] for anno in anno_texts] # make multiple sentence one (flatten inner array)
		
	def _get_non_annotations(self, sentence, n):
		if n == -1: # "all" annotations?
			n = 1000

		sentence = list(map(lambda w: w.lower(), sentence)) # all map keys are "lowercase", should here, too

		# get ids not to use!
		disallowed_ids = set()
		for word in sentence:
			if word in self.map:
				disallowed_ids.update(self.map[word])

		# randomize starting index
		s_index = self.random_seed % len(sentence)
		if sentence[s_index] in self.shuffled_keys:
			k_index = self.shuffled_keys.index(sentence[s_index]) - 1
			if k_index < 0:
				k_index = len(self.shuffled_keys) - 1
		else:
			k_index = 0

		annotations = set()
		while_count = 0
		while len(annotations) < n:
			if self.shuffled_keys[k_index] not in sentence:
				annotations.update(self.map[self.shuffled_keys[k_index]])
			annotations -= disallowed_ids # make sure to not use disallowed ones

			k_index = (k_index + len(sentence)) % len(self.shuffled_keys)

			while_count += 1
			if while_count > 1000: # prevent endless loops
				break

		return self._annotation_id2text(annotations)[:n]
		
	def _get_cachename(self):
		return "quotes-" + self.preprocessor_name + "-"

	def get_inverse_annotator(self):
		return InverseQuotes(self.preprocessor)

class InverseQuotes(InverseAnnotator):

	ANNOTATOR_CLASS = Quotes
	
	def _init(self, inverse_map):
		self.inverse_map = inverse_map

		try:
			self.inverse_texts
		except: 
			self.inverse_texts = self.annotator._get_cached(suffix='inversed_texts')

	def _create_cache_data(self):
		self.inverse_texts = {}
		for count, anno_text in enumerate(self.annotator.texts):
			anno_text = str(tuple(self.annotator._annotation_id2text([anno_text], only_flatten=True).pop(0)))
			if anno_text not in self.inverse_texts:
				self.inverse_texts[anno_text] = []
			self.inverse_texts[anno_text].append(count)

		self.annotator._set_cached(self.inverse_texts, suffix='inversed_texts')

		inverse_map = {}
		for keyword, anno_ids in self.annotator.map.items():
			for anno_id in anno_ids:
				anno_id = str(anno_id)
				if anno_id not in inverse_map:
					inverse_map[anno_id] = []
				inverse_map[anno_id].append(keyword)

		return inverse_map

	def is_annotation(self, sentence, annotation):
		annotation = str(tuple(annotation))
		if annotation in self.inverse_texts:
			anno_keywords = []
			for anno_id in self.inverse_texts[annotation]:
				anno_keywords.extend(self.inverse_map[str(anno_id)])

			sentence = list(map(lambda w: w.lower(), sentence)) # all map keys are "lowercase", should here, too
			return len(set(sentence) & set(anno_keywords)) >= 1
		else:
			return False # unknown annotation
		
		
View Source
class Wiktionary(Annotator):
	"""
		Annotator using definitions from wiktionary dictionary.

		Dataset source: <https://en.wiktionary.org/>
	"""

	DATAFILE = os.path.join(const.DATASET_DIR, "wiktionary", "data.json")

	def __init__(self, **kwargs):
		super().__init__(**kwargs)

		if self._is_cached(suffix='map'):
			self.map = self._get_cached(suffix='map')
		else:
			self._create_map()

		self._shuffle_annotations()
		self.max_gram = max([int(k) for k in self.map.keys()])

	def _create_map(self):
		self.map = {}
		with open(Wiktionary.DATAFILE, "r", errors='ignore') as f:
			for line in f:
				line = json.loads(line)

				annotation_text = re.sub('<[^<]+?>', '', line['annotation'][0]) # remove tags (from wiktionary)
				annotations = self.preprocessor.preprocess_document(annotation_text)
				if len(annotations) > 1: # last sentence is often not good, when there are multiple
					annotations = annotations[:-1]

				for annotation in annotations: # each sentence as one poss. annotation
					if len(annotation) >= 4:
						if annotation[0] == '#':
							annotation = annotation[1:]
						key = self.preprocessor.preprocess_words(line['key'].lower())
						if len(key) > 0:
							num_words = str(len(key))
							
							if num_words not in self.map:
								self.map[num_words] = {}

							key = '-'.join(key)

							if key not in self.map[num_words]:
								self.map[num_words][key] = []

							self.map[num_words][key].append(annotation)

		self._set_cached(self.map, suffix='map')

	def _shuffle_annotations(self):
		anno_count = 0
		# for each length (unigram, bigram, ...)
		for num_words in self.map:

			single_keys, del_keys = [], []
			# iterate over each key
			for key in self.map[num_words]:
				self.random.shuffle(self.map[num_words][key]) # to randomize selected annotations later

				if self.use_percentages: # splitted annotator?
					cur_len = len(self.map[num_words][key])
					if cur_len >= self.len_percentages: # at least one annotation per percentage possible? -> split!
						start = math.ceil(self.part_start * cur_len)
						end = start + math.ceil(self.part_len * cur_len)

						self.map[num_words][key] = self.map[num_words][key][start:end]

						# remove if empty
						if len(self.map[num_words][key]) < 1:
							del_keys.append(key)

					else: # store for later
						single_keys.append(key)	

				anno_count += len(self.map[num_words][key])

			# delete key marked as empty
			for key in del_keys:
				del self.map[num_words][key]

			if self.use_percentages: # delete "number of percentages" from single key each
				cur_len = len(single_keys)
				start = math.ceil(self.part_start * cur_len)
				end = start + math.ceil(self.part_len * cur_len)

				for key in single_keys[:start] + single_keys[end:]:
					del self.map[num_words][key]
					anno_count += -1

		if anno_count < 40000: # maximum of 293 296 annotations if no splitting used
			print("======================")
			print("Very small annotater created, contains only", anno_count, "annotations!")
			print("======================")

		self.shuffled_keys = list(self.map["1"].keys())
		self.random.shuffle(self.shuffled_keys)

	def _get_annotations(self, sentence, n):
		if len(sentence) <= 0: # no words?
			return []
		if n == -1: # "all" annotations?
			n = 1000

		sentence = list(map(lambda w: w.lower(), sentence)) # all map keys are "lowercase", should here, too

		annotations = []	
		for gram in everygrams(sentence, min_len=2, max_len=self.max_gram):
			grl = str(len(gram))
			if grl in self.map:
				key = '-'.join(gram)
				if key in self.map[grl]:
					annotations.extend(self.map[grl][key][:n])
		
		for word in sentence:
			if word in self.map["1"]:
				annotations.extend(self.map["1"][word][:n])

		l_an = len(annotations)
		if l_an+1 <= n:
			return annotations[:n]
		else:
			return [annotations[(i + i*n + self.random_seed) % l_an] for i in range(n)]
		
	def _get_non_annotations(self, sentence, n):
		if n == -1: # "all" annotations?
			n = 1000

		sentence = list(map(lambda w: w.lower(), sentence)) # all map keys are "lowercase", should here, too

		# randomize starting index
		s_index = self.random_seed % len(sentence)
		if sentence[s_index] in self.shuffled_keys:
			k_index = self.shuffled_keys.index(sentence[s_index]) - 1
			if k_index < 0:
				k_index = len(self.shuffled_keys) - 1
		else:
			k_index = 0

		annotations = []
		while len(annotations) < n:

			if self.shuffled_keys[k_index] not in sentence:
				annotations.append(self.map["1"][self.shuffled_keys[k_index]][0])

			k_index = (k_index + len(sentence)) % len(self.shuffled_keys)

		return annotations
		
	def _get_cachename(self):
		return "wiktionary-" + self.preprocessor_name + "-"

	def get_inverse_annotator(self):
		return InverseWiktionary(self.preprocessor)

Annotator using definitions from wiktionary dictionary.

Dataset source: https://en.wiktionary.org/

#   Wiktionary(**kwargs)
View Source
	def __init__(self, **kwargs):
		super().__init__(**kwargs)

		if self._is_cached(suffix='map'):
			self.map = self._get_cached(suffix='map')
		else:
			self._create_map()

		self._shuffle_annotations()
		self.max_gram = max([int(k) for k in self.map.keys()])
Args
  • percentages (array of float): Annotators do not support splitting, but one may select only a subset of the possible annotations by giving the percentage. E.g. to get one subset using 30% and one using 70% one would write percentages=[0.3, 0.7].
  • part (int): Select one of the percentages defined by percentages. For percentages=[0.3, 0.7] setting part=0 would select 30%, part=1 70%.
  • preprocessor (core.corpus.preprocess.Preprocessor): The preprocessor to use, if None uses core.corpus.preprocess.DefaultPreprocessor
#   DATAFILE = '/home/user/data/wiktionary/data.json'
#   def get_inverse_annotator(self):
View Source
	def get_inverse_annotator(self):
		return InverseWiktionary(self.preprocessor)

Get an instance of core.corpus.annotator.InverseAnnotator for the
Annotator.

#   class InverseWiktionary(core.corpus.annotator.InverseAnnotator):
View Source
class InverseWiktionary(InverseAnnotator):

	ANNOTATOR_CLASS = Wiktionary
	
	def _init(self, cache_data):
		self.inverse_map = cache_data

	def _create_cache_data(self):
		cache_data = {}
		# for each length (unigram, bigram, ...)
		for num_words in self.annotator.map:
			for word, scds in self.annotator.map[num_words].items():
				if '-' in word:
					words = word.split('-')
				else:
					words = [word]
				for scd in scds:
					scd_key = str(tuple(scd))
					if scd_key not in cache_data:
						cache_data[scd_key] = []
					cache_data[scd_key].extend(words)

		return cache_data

	def is_annotation(self, sentence, annotation):
		scd_key = str(tuple(annotation))
		if scd_key in self.inverse_map:
			words = self.inverse_map[scd_key]
			sentence = list(map(lambda w: w.lower(), sentence)) # all map keys are "lowercase", should here, too
			return len(set(sentence) & set(words)) >= 1
		else:
			return False # unknown annotation

To check predicted annotations it is sometimes necessary to map back from an annotation to the text.

This class allows to get the similarity between a sentence and scd for some annotator.

#   def is_annotation(self, sentence, annotation):
View Source
	def is_annotation(self, sentence, annotation):
		scd_key = str(tuple(annotation))
		if scd_key in self.inverse_map:
			words = self.inverse_map[scd_key]
			sentence = list(map(lambda w: w.lower(), sentence)) # all map keys are "lowercase", should here, too
			return len(set(sentence) & set(words)) >= 1
		else:
			return False # unknown annotation

Checks if the sentence could get the annotation.

Args
  • sentence (list of str): The sentence annotated
  • annotation (list of str): The annotation given for sentence
Returns

bool, is possible annotation

#   class InverseWiktionary.ANNOTATOR_CLASS(core.corpus.annotator.Annotator):
View Source
class Wiktionary(Annotator):
	"""
		Annotator using definitions from wiktionary dictionary.

		Dataset source: <https://en.wiktionary.org/>
	"""

	DATAFILE = os.path.join(const.DATASET_DIR, "wiktionary", "data.json")

	def __init__(self, **kwargs):
		super().__init__(**kwargs)

		if self._is_cached(suffix='map'):
			self.map = self._get_cached(suffix='map')
		else:
			self._create_map()

		self._shuffle_annotations()
		self.max_gram = max([int(k) for k in self.map.keys()])

	def _create_map(self):
		self.map = {}
		with open(Wiktionary.DATAFILE, "r", errors='ignore') as f:
			for line in f:
				line = json.loads(line)

				annotation_text = re.sub('<[^<]+?>', '', line['annotation'][0]) # remove tags (from wiktionary)
				annotations = self.preprocessor.preprocess_document(annotation_text)
				if len(annotations) > 1: # last sentence is often not good, when there are multiple
					annotations = annotations[:-1]

				for annotation in annotations: # each sentence as one poss. annotation
					if len(annotation) >= 4:
						if annotation[0] == '#':
							annotation = annotation[1:]
						key = self.preprocessor.preprocess_words(line['key'].lower())
						if len(key) > 0:
							num_words = str(len(key))
							
							if num_words not in self.map:
								self.map[num_words] = {}

							key = '-'.join(key)

							if key not in self.map[num_words]:
								self.map[num_words][key] = []

							self.map[num_words][key].append(annotation)

		self._set_cached(self.map, suffix='map')

	def _shuffle_annotations(self):
		anno_count = 0
		# for each length (unigram, bigram, ...)
		for num_words in self.map:

			single_keys, del_keys = [], []
			# iterate over each key
			for key in self.map[num_words]:
				self.random.shuffle(self.map[num_words][key]) # to randomize selected annotations later

				if self.use_percentages: # splitted annotator?
					cur_len = len(self.map[num_words][key])
					if cur_len >= self.len_percentages: # at least one annotation per percentage possible? -> split!
						start = math.ceil(self.part_start * cur_len)
						end = start + math.ceil(self.part_len * cur_len)

						self.map[num_words][key] = self.map[num_words][key][start:end]

						# remove if empty
						if len(self.map[num_words][key]) < 1:
							del_keys.append(key)

					else: # store for later
						single_keys.append(key)	

				anno_count += len(self.map[num_words][key])

			# delete key marked as empty
			for key in del_keys:
				del self.map[num_words][key]

			if self.use_percentages: # delete "number of percentages" from single key each
				cur_len = len(single_keys)
				start = math.ceil(self.part_start * cur_len)
				end = start + math.ceil(self.part_len * cur_len)

				for key in single_keys[:start] + single_keys[end:]:
					del self.map[num_words][key]
					anno_count += -1

		if anno_count < 40000: # maximum of 293 296 annotations if no splitting used
			print("======================")
			print("Very small annotater created, contains only", anno_count, "annotations!")
			print("======================")

		self.shuffled_keys = list(self.map["1"].keys())
		self.random.shuffle(self.shuffled_keys)

	def _get_annotations(self, sentence, n):
		if len(sentence) <= 0: # no words?
			return []
		if n == -1: # "all" annotations?
			n = 1000

		sentence = list(map(lambda w: w.lower(), sentence)) # all map keys are "lowercase", should here, too

		annotations = []	
		for gram in everygrams(sentence, min_len=2, max_len=self.max_gram):
			grl = str(len(gram))
			if grl in self.map:
				key = '-'.join(gram)
				if key in self.map[grl]:
					annotations.extend(self.map[grl][key][:n])
		
		for word in sentence:
			if word in self.map["1"]:
				annotations.extend(self.map["1"][word][:n])

		l_an = len(annotations)
		if l_an+1 <= n:
			return annotations[:n]
		else:
			return [annotations[(i + i*n + self.random_seed) % l_an] for i in range(n)]
		
	def _get_non_annotations(self, sentence, n):
		if n == -1: # "all" annotations?
			n = 1000

		sentence = list(map(lambda w: w.lower(), sentence)) # all map keys are "lowercase", should here, too

		# randomize starting index
		s_index = self.random_seed % len(sentence)
		if sentence[s_index] in self.shuffled_keys:
			k_index = self.shuffled_keys.index(sentence[s_index]) - 1
			if k_index < 0:
				k_index = len(self.shuffled_keys) - 1
		else:
			k_index = 0

		annotations = []
		while len(annotations) < n:

			if self.shuffled_keys[k_index] not in sentence:
				annotations.append(self.map["1"][self.shuffled_keys[k_index]][0])

			k_index = (k_index + len(sentence)) % len(self.shuffled_keys)

		return annotations
		
	def _get_cachename(self):
		return "wiktionary-" + self.preprocessor_name + "-"

	def get_inverse_annotator(self):
		return InverseWiktionary(self.preprocessor)

Annotator using definitions from wiktionary dictionary.

Dataset source: https://en.wiktionary.org/

View Source
class Quotes(Annotator):
	"""
		Annotator using quotes from the quotes dataset.

		Dataset source: <https://github.com/ShivaliGoel/Quotes-500K>
	"""

	DATAFILE = os.path.join(const.DATASET_DIR, "quotes", "data.csv")

	def __init__(self, **kwargs):
		super().__init__(**kwargs)

		if self._is_cached(suffix='map') and self._is_cached(suffix='texts'):
			self.map = self._get_cached(suffix='map')
			self.texts = self._get_cached(suffix='texts')
		else:
			self._create_map_texts()

		self._shuffle_annotations()

	def _create_map_texts(self):
		self.map = {}
		self.texts = []
		with open(Quotes.DATAFILE, "r", errors='ignore') as f:
			count = 0
			for line in csv.reader(f):
				annotation_texts = self.preprocessor.preprocess_document(line[0])
				annotation_keywords = self.preprocessor.preprocess_words(line[2])
				
				if sum([len(at) for at in annotation_texts]) > 4:
					added = False
					for annotation_keyword in annotation_keywords:
						annotation_keyword = annotation_keyword.lower()

						if len(annotation_keyword) > 1:
							if annotation_keyword not in self.map:
								self.map[annotation_keyword] = []
							self.map[annotation_keyword].append(count)
							added = True

					if added:
						self.texts.append(annotation_texts)
						count += 1	

		self._set_cached(self.map, suffix='map')
		self._set_cached(self.texts, suffix='texts')

	def _shuffle_annotations(self):
		anno_count = 0
	
		single_keys, del_keys = [], []
		# iterate over each key
		for key in self.map:
			self.random.shuffle(self.map[key]) # to randomize selected annotations later

			if self.use_percentages: # splitted annotator?
				cur_len = len(self.map[key])
				if cur_len >= self.len_percentages: # at least one annotation per percentage possible? -> split!
					start = math.ceil(self.part_start * cur_len)
					end = start + math.ceil(self.part_len * cur_len)

					self.map[key] = self.map[key][start:end]

					# remove if empty
					if len(self.map[key]) < 1:
						del_keys.append(key)

				else: # store for later
					single_keys.append(key)	

			anno_count += len(self.map[key])

		# delete key marked as empty
		for key in del_keys:
			del self.map[key]

		if self.use_percentages: # delete "number of percentages" from single key each
			cur_len = len(single_keys)
			start = math.ceil(self.part_start * cur_len)
			end = start + math.ceil(self.part_len * cur_len)

			for key in single_keys[:start] + single_keys[end:]:
				del self.map[key]
				anno_count += -1

		if anno_count < 40000: # maximum of > 500 000 annotations if no splitting used
			print("======================")
			print("Very small annotater created, contains only", anno_count, "annotations!")
			print("======================")

		self.shuffled_keys = list(self.map.keys())
		self.random.shuffle(self.shuffled_keys)

	def _get_annotations(self, sentence, n):
		if len(sentence) <= 0: # no words?
			return []
		if n == -1: # "all" annotations?
			n = 1000

		sentence = list(map(lambda w: w.lower(), sentence)) # all map keys are "lowercase", should here, too

		annotations = []	
		for word in sentence:
			if word in self.map:
				annotations.extend(self.map[word][:n])

		l_an = len(annotations)
		if l_an+1 <= n:
			anno_ids = annotations[:n]
		else:
			anno_ids = [annotations[(i + i*n + self.random_seed) % l_an] for i in range(n)]

		return self._annotation_id2text(anno_ids)
		
	def _annotation_id2text(self, anno_ids, only_flatten = False):
		if only_flatten:
			anno_texts = anno_ids
		else:
			anno_texts = [self.texts[int(anno_id)] for anno_id in anno_ids] # convert back to text from id
		return [ [word for sentence in anno for word in sentence] for anno in anno_texts] # make multiple sentence one (flatten inner array)
		
	def _get_non_annotations(self, sentence, n):
		if n == -1: # "all" annotations?
			n = 1000

		sentence = list(map(lambda w: w.lower(), sentence)) # all map keys are "lowercase", should here, too

		# get ids not to use!
		disallowed_ids = set()
		for word in sentence:
			if word in self.map:
				disallowed_ids.update(self.map[word])

		# randomize starting index
		s_index = self.random_seed % len(sentence)
		if sentence[s_index] in self.shuffled_keys:
			k_index = self.shuffled_keys.index(sentence[s_index]) - 1
			if k_index < 0:
				k_index = len(self.shuffled_keys) - 1
		else:
			k_index = 0

		annotations = set()
		while_count = 0
		while len(annotations) < n:
			if self.shuffled_keys[k_index] not in sentence:
				annotations.update(self.map[self.shuffled_keys[k_index]])
			annotations -= disallowed_ids # make sure to not use disallowed ones

			k_index = (k_index + len(sentence)) % len(self.shuffled_keys)

			while_count += 1
			if while_count > 1000: # prevent endless loops
				break

		return self._annotation_id2text(annotations)[:n]
		
	def _get_cachename(self):
		return "quotes-" + self.preprocessor_name + "-"

	def get_inverse_annotator(self):
		return InverseQuotes(self.preprocessor)

Annotator using quotes from the quotes dataset.

Dataset source: https://github.com/ShivaliGoel/Quotes-500K

#   Quotes(**kwargs)
View Source
	def __init__(self, **kwargs):
		super().__init__(**kwargs)

		if self._is_cached(suffix='map') and self._is_cached(suffix='texts'):
			self.map = self._get_cached(suffix='map')
			self.texts = self._get_cached(suffix='texts')
		else:
			self._create_map_texts()

		self._shuffle_annotations()
Args
  • percentages (array of float): Annotators do not support splitting, but one may select only a subset of the possible annotations by giving the percentage. E.g. to get one subset using 30% and one using 70% one would write percentages=[0.3, 0.7].
  • part (int): Select one of the percentages defined by percentages. For percentages=[0.3, 0.7] setting part=0 would select 30%, part=1 70%.
  • preprocessor (core.corpus.preprocess.Preprocessor): The preprocessor to use, if None uses core.corpus.preprocess.DefaultPreprocessor
#   DATAFILE = '/home/user/data/quotes/data.csv'
#   def get_inverse_annotator(self):
View Source
	def get_inverse_annotator(self):
		return InverseQuotes(self.preprocessor)

Get an instance of core.corpus.annotator.InverseAnnotator for the
Annotator.

View Source
class InverseQuotes(InverseAnnotator):

	ANNOTATOR_CLASS = Quotes
	
	def _init(self, inverse_map):
		self.inverse_map = inverse_map

		try:
			self.inverse_texts
		except: 
			self.inverse_texts = self.annotator._get_cached(suffix='inversed_texts')

	def _create_cache_data(self):
		self.inverse_texts = {}
		for count, anno_text in enumerate(self.annotator.texts):
			anno_text = str(tuple(self.annotator._annotation_id2text([anno_text], only_flatten=True).pop(0)))
			if anno_text not in self.inverse_texts:
				self.inverse_texts[anno_text] = []
			self.inverse_texts[anno_text].append(count)

		self.annotator._set_cached(self.inverse_texts, suffix='inversed_texts')

		inverse_map = {}
		for keyword, anno_ids in self.annotator.map.items():
			for anno_id in anno_ids:
				anno_id = str(anno_id)
				if anno_id not in inverse_map:
					inverse_map[anno_id] = []
				inverse_map[anno_id].append(keyword)

		return inverse_map

	def is_annotation(self, sentence, annotation):
		annotation = str(tuple(annotation))
		if annotation in self.inverse_texts:
			anno_keywords = []
			for anno_id in self.inverse_texts[annotation]:
				anno_keywords.extend(self.inverse_map[str(anno_id)])

			sentence = list(map(lambda w: w.lower(), sentence)) # all map keys are "lowercase", should here, too
			return len(set(sentence) & set(anno_keywords)) >= 1
		else:
			return False # unknown annotation

To check predicted annotations it is sometimes necessary to map back from an annotation to the text.

This class allows to get the similarity between a sentence and scd for some annotator.

#   def is_annotation(self, sentence, annotation):
View Source
	def is_annotation(self, sentence, annotation):
		annotation = str(tuple(annotation))
		if annotation in self.inverse_texts:
			anno_keywords = []
			for anno_id in self.inverse_texts[annotation]:
				anno_keywords.extend(self.inverse_map[str(anno_id)])

			sentence = list(map(lambda w: w.lower(), sentence)) # all map keys are "lowercase", should here, too
			return len(set(sentence) & set(anno_keywords)) >= 1
		else:
			return False # unknown annotation

Checks if the sentence could get the annotation.

Args
  • sentence (list of str): The sentence annotated
  • annotation (list of str): The annotation given for sentence
Returns

bool, is possible annotation

#   class InverseQuotes.ANNOTATOR_CLASS(core.corpus.annotator.Annotator):
View Source
class Quotes(Annotator):
	"""
		Annotator using quotes from the quotes dataset.

		Dataset source: <https://github.com/ShivaliGoel/Quotes-500K>
	"""

	DATAFILE = os.path.join(const.DATASET_DIR, "quotes", "data.csv")

	def __init__(self, **kwargs):
		super().__init__(**kwargs)

		if self._is_cached(suffix='map') and self._is_cached(suffix='texts'):
			self.map = self._get_cached(suffix='map')
			self.texts = self._get_cached(suffix='texts')
		else:
			self._create_map_texts()

		self._shuffle_annotations()

	def _create_map_texts(self):
		self.map = {}
		self.texts = []
		with open(Quotes.DATAFILE, "r", errors='ignore') as f:
			count = 0
			for line in csv.reader(f):
				annotation_texts = self.preprocessor.preprocess_document(line[0])
				annotation_keywords = self.preprocessor.preprocess_words(line[2])
				
				if sum([len(at) for at in annotation_texts]) > 4:
					added = False
					for annotation_keyword in annotation_keywords:
						annotation_keyword = annotation_keyword.lower()

						if len(annotation_keyword) > 1:
							if annotation_keyword not in self.map:
								self.map[annotation_keyword] = []
							self.map[annotation_keyword].append(count)
							added = True

					if added:
						self.texts.append(annotation_texts)
						count += 1	

		self._set_cached(self.map, suffix='map')
		self._set_cached(self.texts, suffix='texts')

	def _shuffle_annotations(self):
		anno_count = 0
	
		single_keys, del_keys = [], []
		# iterate over each key
		for key in self.map:
			self.random.shuffle(self.map[key]) # to randomize selected annotations later

			if self.use_percentages: # splitted annotator?
				cur_len = len(self.map[key])
				if cur_len >= self.len_percentages: # at least one annotation per percentage possible? -> split!
					start = math.ceil(self.part_start * cur_len)
					end = start + math.ceil(self.part_len * cur_len)

					self.map[key] = self.map[key][start:end]

					# remove if empty
					if len(self.map[key]) < 1:
						del_keys.append(key)

				else: # store for later
					single_keys.append(key)	

			anno_count += len(self.map[key])

		# delete key marked as empty
		for key in del_keys:
			del self.map[key]

		if self.use_percentages: # delete "number of percentages" from single key each
			cur_len = len(single_keys)
			start = math.ceil(self.part_start * cur_len)
			end = start + math.ceil(self.part_len * cur_len)

			for key in single_keys[:start] + single_keys[end:]:
				del self.map[key]
				anno_count += -1

		if anno_count < 40000: # maximum of > 500 000 annotations if no splitting used
			print("======================")
			print("Very small annotater created, contains only", anno_count, "annotations!")
			print("======================")

		self.shuffled_keys = list(self.map.keys())
		self.random.shuffle(self.shuffled_keys)

	def _get_annotations(self, sentence, n):
		if len(sentence) <= 0: # no words?
			return []
		if n == -1: # "all" annotations?
			n = 1000

		sentence = list(map(lambda w: w.lower(), sentence)) # all map keys are "lowercase", should here, too

		annotations = []	
		for word in sentence:
			if word in self.map:
				annotations.extend(self.map[word][:n])

		l_an = len(annotations)
		if l_an+1 <= n:
			anno_ids = annotations[:n]
		else:
			anno_ids = [annotations[(i + i*n + self.random_seed) % l_an] for i in range(n)]

		return self._annotation_id2text(anno_ids)
		
	def _annotation_id2text(self, anno_ids, only_flatten = False):
		if only_flatten:
			anno_texts = anno_ids
		else:
			anno_texts = [self.texts[int(anno_id)] for anno_id in anno_ids] # convert back to text from id
		return [ [word for sentence in anno for word in sentence] for anno in anno_texts] # make multiple sentence one (flatten inner array)
		
	def _get_non_annotations(self, sentence, n):
		if n == -1: # "all" annotations?
			n = 1000

		sentence = list(map(lambda w: w.lower(), sentence)) # all map keys are "lowercase", should here, too

		# get ids not to use!
		disallowed_ids = set()
		for word in sentence:
			if word in self.map:
				disallowed_ids.update(self.map[word])

		# randomize starting index
		s_index = self.random_seed % len(sentence)
		if sentence[s_index] in self.shuffled_keys:
			k_index = self.shuffled_keys.index(sentence[s_index]) - 1
			if k_index < 0:
				k_index = len(self.shuffled_keys) - 1
		else:
			k_index = 0

		annotations = set()
		while_count = 0
		while len(annotations) < n:
			if self.shuffled_keys[k_index] not in sentence:
				annotations.update(self.map[self.shuffled_keys[k_index]])
			annotations -= disallowed_ids # make sure to not use disallowed ones

			k_index = (k_index + len(sentence)) % len(self.shuffled_keys)

			while_count += 1
			if while_count > 1000: # prevent endless loops
				break

		return self._annotation_id2text(annotations)[:n]
		
	def _get_cachename(self):
		return "quotes-" + self.preprocessor_name + "-"

	def get_inverse_annotator(self):
		return InverseQuotes(self.preprocessor)

Annotator using quotes from the quotes dataset.

Dataset source: https://github.com/ShivaliGoel/Quotes-500K