core.corpus.annotators
View Source
import os, json, re, math, csv from nltk.util import everygrams from core.corpus.annotator import Annotator, InverseAnnotator import core.utils.const as const class Wiktionary(Annotator): """ Annotator using definitions from wiktionary dictionary. Dataset source: <https://en.wiktionary.org/> """ DATAFILE = os.path.join(const.DATASET_DIR, "wiktionary", "data.json") def __init__(self, **kwargs): super().__init__(**kwargs) if self._is_cached(suffix='map'): self.map = self._get_cached(suffix='map') else: self._create_map() self._shuffle_annotations() self.max_gram = max([int(k) for k in self.map.keys()]) def _create_map(self): self.map = {} with open(Wiktionary.DATAFILE, "r", errors='ignore') as f: for line in f: line = json.loads(line) annotation_text = re.sub('<[^<]+?>', '', line['annotation'][0]) # remove tags (from wiktionary) annotations = self.preprocessor.preprocess_document(annotation_text) if len(annotations) > 1: # last sentence is often not good, when there are multiple annotations = annotations[:-1] for annotation in annotations: # each sentence as one poss. annotation if len(annotation) >= 4: if annotation[0] == '#': annotation = annotation[1:] key = self.preprocessor.preprocess_words(line['key'].lower()) if len(key) > 0: num_words = str(len(key)) if num_words not in self.map: self.map[num_words] = {} key = '-'.join(key) if key not in self.map[num_words]: self.map[num_words][key] = [] self.map[num_words][key].append(annotation) self._set_cached(self.map, suffix='map') def _shuffle_annotations(self): anno_count = 0 # for each length (unigram, bigram, ...) for num_words in self.map: single_keys, del_keys = [], [] # iterate over each key for key in self.map[num_words]: self.random.shuffle(self.map[num_words][key]) # to randomize selected annotations later if self.use_percentages: # splitted annotator? cur_len = len(self.map[num_words][key]) if cur_len >= self.len_percentages: # at least one annotation per percentage possible? -> split! start = math.ceil(self.part_start * cur_len) end = start + math.ceil(self.part_len * cur_len) self.map[num_words][key] = self.map[num_words][key][start:end] # remove if empty if len(self.map[num_words][key]) < 1: del_keys.append(key) else: # store for later single_keys.append(key) anno_count += len(self.map[num_words][key]) # delete key marked as empty for key in del_keys: del self.map[num_words][key] if self.use_percentages: # delete "number of percentages" from single key each cur_len = len(single_keys) start = math.ceil(self.part_start * cur_len) end = start + math.ceil(self.part_len * cur_len) for key in single_keys[:start] + single_keys[end:]: del self.map[num_words][key] anno_count += -1 if anno_count < 40000: # maximum of 293 296 annotations if no splitting used print("======================") print("Very small annotater created, contains only", anno_count, "annotations!") print("======================") self.shuffled_keys = list(self.map["1"].keys()) self.random.shuffle(self.shuffled_keys) def _get_annotations(self, sentence, n): if len(sentence) <= 0: # no words? return [] if n == -1: # "all" annotations? n = 1000 sentence = list(map(lambda w: w.lower(), sentence)) # all map keys are "lowercase", should here, too annotations = [] for gram in everygrams(sentence, min_len=2, max_len=self.max_gram): grl = str(len(gram)) if grl in self.map: key = '-'.join(gram) if key in self.map[grl]: annotations.extend(self.map[grl][key][:n]) for word in sentence: if word in self.map["1"]: annotations.extend(self.map["1"][word][:n]) l_an = len(annotations) if l_an+1 <= n: return annotations[:n] else: return [annotations[(i + i*n + self.random_seed) % l_an] for i in range(n)] def _get_non_annotations(self, sentence, n): if n == -1: # "all" annotations? n = 1000 sentence = list(map(lambda w: w.lower(), sentence)) # all map keys are "lowercase", should here, too # randomize starting index s_index = self.random_seed % len(sentence) if sentence[s_index] in self.shuffled_keys: k_index = self.shuffled_keys.index(sentence[s_index]) - 1 if k_index < 0: k_index = len(self.shuffled_keys) - 1 else: k_index = 0 annotations = [] while len(annotations) < n: if self.shuffled_keys[k_index] not in sentence: annotations.append(self.map["1"][self.shuffled_keys[k_index]][0]) k_index = (k_index + len(sentence)) % len(self.shuffled_keys) return annotations def _get_cachename(self): return "wiktionary-" + self.preprocessor_name + "-" def get_inverse_annotator(self): return InverseWiktionary(self.preprocessor) class InverseWiktionary(InverseAnnotator): ANNOTATOR_CLASS = Wiktionary def _init(self, cache_data): self.inverse_map = cache_data def _create_cache_data(self): cache_data = {} # for each length (unigram, bigram, ...) for num_words in self.annotator.map: for word, scds in self.annotator.map[num_words].items(): if '-' in word: words = word.split('-') else: words = [word] for scd in scds: scd_key = str(tuple(scd)) if scd_key not in cache_data: cache_data[scd_key] = [] cache_data[scd_key].extend(words) return cache_data def is_annotation(self, sentence, annotation): scd_key = str(tuple(annotation)) if scd_key in self.inverse_map: words = self.inverse_map[scd_key] sentence = list(map(lambda w: w.lower(), sentence)) # all map keys are "lowercase", should here, too return len(set(sentence) & set(words)) >= 1 else: return False # unknown annotation class Quotes(Annotator): """ Annotator using quotes from the quotes dataset. Dataset source: <https://github.com/ShivaliGoel/Quotes-500K> """ DATAFILE = os.path.join(const.DATASET_DIR, "quotes", "data.csv") def __init__(self, **kwargs): super().__init__(**kwargs) if self._is_cached(suffix='map') and self._is_cached(suffix='texts'): self.map = self._get_cached(suffix='map') self.texts = self._get_cached(suffix='texts') else: self._create_map_texts() self._shuffle_annotations() def _create_map_texts(self): self.map = {} self.texts = [] with open(Quotes.DATAFILE, "r", errors='ignore') as f: count = 0 for line in csv.reader(f): annotation_texts = self.preprocessor.preprocess_document(line[0]) annotation_keywords = self.preprocessor.preprocess_words(line[2]) if sum([len(at) for at in annotation_texts]) > 4: added = False for annotation_keyword in annotation_keywords: annotation_keyword = annotation_keyword.lower() if len(annotation_keyword) > 1: if annotation_keyword not in self.map: self.map[annotation_keyword] = [] self.map[annotation_keyword].append(count) added = True if added: self.texts.append(annotation_texts) count += 1 self._set_cached(self.map, suffix='map') self._set_cached(self.texts, suffix='texts') def _shuffle_annotations(self): anno_count = 0 single_keys, del_keys = [], [] # iterate over each key for key in self.map: self.random.shuffle(self.map[key]) # to randomize selected annotations later if self.use_percentages: # splitted annotator? cur_len = len(self.map[key]) if cur_len >= self.len_percentages: # at least one annotation per percentage possible? -> split! start = math.ceil(self.part_start * cur_len) end = start + math.ceil(self.part_len * cur_len) self.map[key] = self.map[key][start:end] # remove if empty if len(self.map[key]) < 1: del_keys.append(key) else: # store for later single_keys.append(key) anno_count += len(self.map[key]) # delete key marked as empty for key in del_keys: del self.map[key] if self.use_percentages: # delete "number of percentages" from single key each cur_len = len(single_keys) start = math.ceil(self.part_start * cur_len) end = start + math.ceil(self.part_len * cur_len) for key in single_keys[:start] + single_keys[end:]: del self.map[key] anno_count += -1 if anno_count < 40000: # maximum of > 500 000 annotations if no splitting used print("======================") print("Very small annotater created, contains only", anno_count, "annotations!") print("======================") self.shuffled_keys = list(self.map.keys()) self.random.shuffle(self.shuffled_keys) def _get_annotations(self, sentence, n): if len(sentence) <= 0: # no words? return [] if n == -1: # "all" annotations? n = 1000 sentence = list(map(lambda w: w.lower(), sentence)) # all map keys are "lowercase", should here, too annotations = [] for word in sentence: if word in self.map: annotations.extend(self.map[word][:n]) l_an = len(annotations) if l_an+1 <= n: anno_ids = annotations[:n] else: anno_ids = [annotations[(i + i*n + self.random_seed) % l_an] for i in range(n)] return self._annotation_id2text(anno_ids) def _annotation_id2text(self, anno_ids, only_flatten = False): if only_flatten: anno_texts = anno_ids else: anno_texts = [self.texts[int(anno_id)] for anno_id in anno_ids] # convert back to text from id return [ [word for sentence in anno for word in sentence] for anno in anno_texts] # make multiple sentence one (flatten inner array) def _get_non_annotations(self, sentence, n): if n == -1: # "all" annotations? n = 1000 sentence = list(map(lambda w: w.lower(), sentence)) # all map keys are "lowercase", should here, too # get ids not to use! disallowed_ids = set() for word in sentence: if word in self.map: disallowed_ids.update(self.map[word]) # randomize starting index s_index = self.random_seed % len(sentence) if sentence[s_index] in self.shuffled_keys: k_index = self.shuffled_keys.index(sentence[s_index]) - 1 if k_index < 0: k_index = len(self.shuffled_keys) - 1 else: k_index = 0 annotations = set() while_count = 0 while len(annotations) < n: if self.shuffled_keys[k_index] not in sentence: annotations.update(self.map[self.shuffled_keys[k_index]]) annotations -= disallowed_ids # make sure to not use disallowed ones k_index = (k_index + len(sentence)) % len(self.shuffled_keys) while_count += 1 if while_count > 1000: # prevent endless loops break return self._annotation_id2text(annotations)[:n] def _get_cachename(self): return "quotes-" + self.preprocessor_name + "-" def get_inverse_annotator(self): return InverseQuotes(self.preprocessor) class InverseQuotes(InverseAnnotator): ANNOTATOR_CLASS = Quotes def _init(self, inverse_map): self.inverse_map = inverse_map try: self.inverse_texts except: self.inverse_texts = self.annotator._get_cached(suffix='inversed_texts') def _create_cache_data(self): self.inverse_texts = {} for count, anno_text in enumerate(self.annotator.texts): anno_text = str(tuple(self.annotator._annotation_id2text([anno_text], only_flatten=True).pop(0))) if anno_text not in self.inverse_texts: self.inverse_texts[anno_text] = [] self.inverse_texts[anno_text].append(count) self.annotator._set_cached(self.inverse_texts, suffix='inversed_texts') inverse_map = {} for keyword, anno_ids in self.annotator.map.items(): for anno_id in anno_ids: anno_id = str(anno_id) if anno_id not in inverse_map: inverse_map[anno_id] = [] inverse_map[anno_id].append(keyword) return inverse_map def is_annotation(self, sentence, annotation): annotation = str(tuple(annotation)) if annotation in self.inverse_texts: anno_keywords = [] for anno_id in self.inverse_texts[annotation]: anno_keywords.extend(self.inverse_map[str(anno_id)]) sentence = list(map(lambda w: w.lower(), sentence)) # all map keys are "lowercase", should here, too return len(set(sentence) & set(anno_keywords)) >= 1 else: return False # unknown annotation
View Source
class Wiktionary(Annotator): """ Annotator using definitions from wiktionary dictionary. Dataset source: <https://en.wiktionary.org/> """ DATAFILE = os.path.join(const.DATASET_DIR, "wiktionary", "data.json") def __init__(self, **kwargs): super().__init__(**kwargs) if self._is_cached(suffix='map'): self.map = self._get_cached(suffix='map') else: self._create_map() self._shuffle_annotations() self.max_gram = max([int(k) for k in self.map.keys()]) def _create_map(self): self.map = {} with open(Wiktionary.DATAFILE, "r", errors='ignore') as f: for line in f: line = json.loads(line) annotation_text = re.sub('<[^<]+?>', '', line['annotation'][0]) # remove tags (from wiktionary) annotations = self.preprocessor.preprocess_document(annotation_text) if len(annotations) > 1: # last sentence is often not good, when there are multiple annotations = annotations[:-1] for annotation in annotations: # each sentence as one poss. annotation if len(annotation) >= 4: if annotation[0] == '#': annotation = annotation[1:] key = self.preprocessor.preprocess_words(line['key'].lower()) if len(key) > 0: num_words = str(len(key)) if num_words not in self.map: self.map[num_words] = {} key = '-'.join(key) if key not in self.map[num_words]: self.map[num_words][key] = [] self.map[num_words][key].append(annotation) self._set_cached(self.map, suffix='map') def _shuffle_annotations(self): anno_count = 0 # for each length (unigram, bigram, ...) for num_words in self.map: single_keys, del_keys = [], [] # iterate over each key for key in self.map[num_words]: self.random.shuffle(self.map[num_words][key]) # to randomize selected annotations later if self.use_percentages: # splitted annotator? cur_len = len(self.map[num_words][key]) if cur_len >= self.len_percentages: # at least one annotation per percentage possible? -> split! start = math.ceil(self.part_start * cur_len) end = start + math.ceil(self.part_len * cur_len) self.map[num_words][key] = self.map[num_words][key][start:end] # remove if empty if len(self.map[num_words][key]) < 1: del_keys.append(key) else: # store for later single_keys.append(key) anno_count += len(self.map[num_words][key]) # delete key marked as empty for key in del_keys: del self.map[num_words][key] if self.use_percentages: # delete "number of percentages" from single key each cur_len = len(single_keys) start = math.ceil(self.part_start * cur_len) end = start + math.ceil(self.part_len * cur_len) for key in single_keys[:start] + single_keys[end:]: del self.map[num_words][key] anno_count += -1 if anno_count < 40000: # maximum of 293 296 annotations if no splitting used print("======================") print("Very small annotater created, contains only", anno_count, "annotations!") print("======================") self.shuffled_keys = list(self.map["1"].keys()) self.random.shuffle(self.shuffled_keys) def _get_annotations(self, sentence, n): if len(sentence) <= 0: # no words? return [] if n == -1: # "all" annotations? n = 1000 sentence = list(map(lambda w: w.lower(), sentence)) # all map keys are "lowercase", should here, too annotations = [] for gram in everygrams(sentence, min_len=2, max_len=self.max_gram): grl = str(len(gram)) if grl in self.map: key = '-'.join(gram) if key in self.map[grl]: annotations.extend(self.map[grl][key][:n]) for word in sentence: if word in self.map["1"]: annotations.extend(self.map["1"][word][:n]) l_an = len(annotations) if l_an+1 <= n: return annotations[:n] else: return [annotations[(i + i*n + self.random_seed) % l_an] for i in range(n)] def _get_non_annotations(self, sentence, n): if n == -1: # "all" annotations? n = 1000 sentence = list(map(lambda w: w.lower(), sentence)) # all map keys are "lowercase", should here, too # randomize starting index s_index = self.random_seed % len(sentence) if sentence[s_index] in self.shuffled_keys: k_index = self.shuffled_keys.index(sentence[s_index]) - 1 if k_index < 0: k_index = len(self.shuffled_keys) - 1 else: k_index = 0 annotations = [] while len(annotations) < n: if self.shuffled_keys[k_index] not in sentence: annotations.append(self.map["1"][self.shuffled_keys[k_index]][0]) k_index = (k_index + len(sentence)) % len(self.shuffled_keys) return annotations def _get_cachename(self): return "wiktionary-" + self.preprocessor_name + "-" def get_inverse_annotator(self): return InverseWiktionary(self.preprocessor)
Annotator using definitions from wiktionary dictionary.
Dataset source: https://en.wiktionary.org/
View Source
def __init__(self, **kwargs): super().__init__(**kwargs) if self._is_cached(suffix='map'): self.map = self._get_cached(suffix='map') else: self._create_map() self._shuffle_annotations() self.max_gram = max([int(k) for k in self.map.keys()])
Args
- percentages (array of float): Annotators do not support splitting, but one may select only a subset
of the possible annotations by giving the percentage. E.g. to get one subset using 30% and one
using 70% one would write
percentages=[0.3, 0.7]
. - part (int): Select one of the percentages defined by
percentages
. Forpercentages=[0.3, 0.7]
settingpart=0
would select 30%,part=1
70%. - preprocessor (
core.corpus.preprocess.Preprocessor
): The preprocessor to use, ifNone
usescore.corpus.preprocess.DefaultPreprocessor
View Source
def get_inverse_annotator(self): return InverseWiktionary(self.preprocessor)
Get an instance of core.corpus.annotator.InverseAnnotator
for the
Annotator.
Inherited Members
View Source
class InverseWiktionary(InverseAnnotator): ANNOTATOR_CLASS = Wiktionary def _init(self, cache_data): self.inverse_map = cache_data def _create_cache_data(self): cache_data = {} # for each length (unigram, bigram, ...) for num_words in self.annotator.map: for word, scds in self.annotator.map[num_words].items(): if '-' in word: words = word.split('-') else: words = [word] for scd in scds: scd_key = str(tuple(scd)) if scd_key not in cache_data: cache_data[scd_key] = [] cache_data[scd_key].extend(words) return cache_data def is_annotation(self, sentence, annotation): scd_key = str(tuple(annotation)) if scd_key in self.inverse_map: words = self.inverse_map[scd_key] sentence = list(map(lambda w: w.lower(), sentence)) # all map keys are "lowercase", should here, too return len(set(sentence) & set(words)) >= 1 else: return False # unknown annotation
To check predicted annotations it is sometimes necessary to map back from an annotation to the text.
This class allows to get the similarity between a sentence and scd for some annotator.
View Source
def is_annotation(self, sentence, annotation): scd_key = str(tuple(annotation)) if scd_key in self.inverse_map: words = self.inverse_map[scd_key] sentence = list(map(lambda w: w.lower(), sentence)) # all map keys are "lowercase", should here, too return len(set(sentence) & set(words)) >= 1 else: return False # unknown annotation
Checks if the sentence could get the annotation.
Args
- sentence (list of str): The sentence annotated
- annotation (list of str): The annotation given for sentence
Returns
bool, is possible annotation
Inherited Members
View Source
class Wiktionary(Annotator): """ Annotator using definitions from wiktionary dictionary. Dataset source: <https://en.wiktionary.org/> """ DATAFILE = os.path.join(const.DATASET_DIR, "wiktionary", "data.json") def __init__(self, **kwargs): super().__init__(**kwargs) if self._is_cached(suffix='map'): self.map = self._get_cached(suffix='map') else: self._create_map() self._shuffle_annotations() self.max_gram = max([int(k) for k in self.map.keys()]) def _create_map(self): self.map = {} with open(Wiktionary.DATAFILE, "r", errors='ignore') as f: for line in f: line = json.loads(line) annotation_text = re.sub('<[^<]+?>', '', line['annotation'][0]) # remove tags (from wiktionary) annotations = self.preprocessor.preprocess_document(annotation_text) if len(annotations) > 1: # last sentence is often not good, when there are multiple annotations = annotations[:-1] for annotation in annotations: # each sentence as one poss. annotation if len(annotation) >= 4: if annotation[0] == '#': annotation = annotation[1:] key = self.preprocessor.preprocess_words(line['key'].lower()) if len(key) > 0: num_words = str(len(key)) if num_words not in self.map: self.map[num_words] = {} key = '-'.join(key) if key not in self.map[num_words]: self.map[num_words][key] = [] self.map[num_words][key].append(annotation) self._set_cached(self.map, suffix='map') def _shuffle_annotations(self): anno_count = 0 # for each length (unigram, bigram, ...) for num_words in self.map: single_keys, del_keys = [], [] # iterate over each key for key in self.map[num_words]: self.random.shuffle(self.map[num_words][key]) # to randomize selected annotations later if self.use_percentages: # splitted annotator? cur_len = len(self.map[num_words][key]) if cur_len >= self.len_percentages: # at least one annotation per percentage possible? -> split! start = math.ceil(self.part_start * cur_len) end = start + math.ceil(self.part_len * cur_len) self.map[num_words][key] = self.map[num_words][key][start:end] # remove if empty if len(self.map[num_words][key]) < 1: del_keys.append(key) else: # store for later single_keys.append(key) anno_count += len(self.map[num_words][key]) # delete key marked as empty for key in del_keys: del self.map[num_words][key] if self.use_percentages: # delete "number of percentages" from single key each cur_len = len(single_keys) start = math.ceil(self.part_start * cur_len) end = start + math.ceil(self.part_len * cur_len) for key in single_keys[:start] + single_keys[end:]: del self.map[num_words][key] anno_count += -1 if anno_count < 40000: # maximum of 293 296 annotations if no splitting used print("======================") print("Very small annotater created, contains only", anno_count, "annotations!") print("======================") self.shuffled_keys = list(self.map["1"].keys()) self.random.shuffle(self.shuffled_keys) def _get_annotations(self, sentence, n): if len(sentence) <= 0: # no words? return [] if n == -1: # "all" annotations? n = 1000 sentence = list(map(lambda w: w.lower(), sentence)) # all map keys are "lowercase", should here, too annotations = [] for gram in everygrams(sentence, min_len=2, max_len=self.max_gram): grl = str(len(gram)) if grl in self.map: key = '-'.join(gram) if key in self.map[grl]: annotations.extend(self.map[grl][key][:n]) for word in sentence: if word in self.map["1"]: annotations.extend(self.map["1"][word][:n]) l_an = len(annotations) if l_an+1 <= n: return annotations[:n] else: return [annotations[(i + i*n + self.random_seed) % l_an] for i in range(n)] def _get_non_annotations(self, sentence, n): if n == -1: # "all" annotations? n = 1000 sentence = list(map(lambda w: w.lower(), sentence)) # all map keys are "lowercase", should here, too # randomize starting index s_index = self.random_seed % len(sentence) if sentence[s_index] in self.shuffled_keys: k_index = self.shuffled_keys.index(sentence[s_index]) - 1 if k_index < 0: k_index = len(self.shuffled_keys) - 1 else: k_index = 0 annotations = [] while len(annotations) < n: if self.shuffled_keys[k_index] not in sentence: annotations.append(self.map["1"][self.shuffled_keys[k_index]][0]) k_index = (k_index + len(sentence)) % len(self.shuffled_keys) return annotations def _get_cachename(self): return "wiktionary-" + self.preprocessor_name + "-" def get_inverse_annotator(self): return InverseWiktionary(self.preprocessor)
Annotator using definitions from wiktionary dictionary.
Dataset source: https://en.wiktionary.org/
View Source
class Quotes(Annotator): """ Annotator using quotes from the quotes dataset. Dataset source: <https://github.com/ShivaliGoel/Quotes-500K> """ DATAFILE = os.path.join(const.DATASET_DIR, "quotes", "data.csv") def __init__(self, **kwargs): super().__init__(**kwargs) if self._is_cached(suffix='map') and self._is_cached(suffix='texts'): self.map = self._get_cached(suffix='map') self.texts = self._get_cached(suffix='texts') else: self._create_map_texts() self._shuffle_annotations() def _create_map_texts(self): self.map = {} self.texts = [] with open(Quotes.DATAFILE, "r", errors='ignore') as f: count = 0 for line in csv.reader(f): annotation_texts = self.preprocessor.preprocess_document(line[0]) annotation_keywords = self.preprocessor.preprocess_words(line[2]) if sum([len(at) for at in annotation_texts]) > 4: added = False for annotation_keyword in annotation_keywords: annotation_keyword = annotation_keyword.lower() if len(annotation_keyword) > 1: if annotation_keyword not in self.map: self.map[annotation_keyword] = [] self.map[annotation_keyword].append(count) added = True if added: self.texts.append(annotation_texts) count += 1 self._set_cached(self.map, suffix='map') self._set_cached(self.texts, suffix='texts') def _shuffle_annotations(self): anno_count = 0 single_keys, del_keys = [], [] # iterate over each key for key in self.map: self.random.shuffle(self.map[key]) # to randomize selected annotations later if self.use_percentages: # splitted annotator? cur_len = len(self.map[key]) if cur_len >= self.len_percentages: # at least one annotation per percentage possible? -> split! start = math.ceil(self.part_start * cur_len) end = start + math.ceil(self.part_len * cur_len) self.map[key] = self.map[key][start:end] # remove if empty if len(self.map[key]) < 1: del_keys.append(key) else: # store for later single_keys.append(key) anno_count += len(self.map[key]) # delete key marked as empty for key in del_keys: del self.map[key] if self.use_percentages: # delete "number of percentages" from single key each cur_len = len(single_keys) start = math.ceil(self.part_start * cur_len) end = start + math.ceil(self.part_len * cur_len) for key in single_keys[:start] + single_keys[end:]: del self.map[key] anno_count += -1 if anno_count < 40000: # maximum of > 500 000 annotations if no splitting used print("======================") print("Very small annotater created, contains only", anno_count, "annotations!") print("======================") self.shuffled_keys = list(self.map.keys()) self.random.shuffle(self.shuffled_keys) def _get_annotations(self, sentence, n): if len(sentence) <= 0: # no words? return [] if n == -1: # "all" annotations? n = 1000 sentence = list(map(lambda w: w.lower(), sentence)) # all map keys are "lowercase", should here, too annotations = [] for word in sentence: if word in self.map: annotations.extend(self.map[word][:n]) l_an = len(annotations) if l_an+1 <= n: anno_ids = annotations[:n] else: anno_ids = [annotations[(i + i*n + self.random_seed) % l_an] for i in range(n)] return self._annotation_id2text(anno_ids) def _annotation_id2text(self, anno_ids, only_flatten = False): if only_flatten: anno_texts = anno_ids else: anno_texts = [self.texts[int(anno_id)] for anno_id in anno_ids] # convert back to text from id return [ [word for sentence in anno for word in sentence] for anno in anno_texts] # make multiple sentence one (flatten inner array) def _get_non_annotations(self, sentence, n): if n == -1: # "all" annotations? n = 1000 sentence = list(map(lambda w: w.lower(), sentence)) # all map keys are "lowercase", should here, too # get ids not to use! disallowed_ids = set() for word in sentence: if word in self.map: disallowed_ids.update(self.map[word]) # randomize starting index s_index = self.random_seed % len(sentence) if sentence[s_index] in self.shuffled_keys: k_index = self.shuffled_keys.index(sentence[s_index]) - 1 if k_index < 0: k_index = len(self.shuffled_keys) - 1 else: k_index = 0 annotations = set() while_count = 0 while len(annotations) < n: if self.shuffled_keys[k_index] not in sentence: annotations.update(self.map[self.shuffled_keys[k_index]]) annotations -= disallowed_ids # make sure to not use disallowed ones k_index = (k_index + len(sentence)) % len(self.shuffled_keys) while_count += 1 if while_count > 1000: # prevent endless loops break return self._annotation_id2text(annotations)[:n] def _get_cachename(self): return "quotes-" + self.preprocessor_name + "-" def get_inverse_annotator(self): return InverseQuotes(self.preprocessor)
Annotator using quotes from the quotes dataset.
Dataset source: https://github.com/ShivaliGoel/Quotes-500K
View Source
def __init__(self, **kwargs): super().__init__(**kwargs) if self._is_cached(suffix='map') and self._is_cached(suffix='texts'): self.map = self._get_cached(suffix='map') self.texts = self._get_cached(suffix='texts') else: self._create_map_texts() self._shuffle_annotations()
Args
- percentages (array of float): Annotators do not support splitting, but one may select only a subset
of the possible annotations by giving the percentage. E.g. to get one subset using 30% and one
using 70% one would write
percentages=[0.3, 0.7]
. - part (int): Select one of the percentages defined by
percentages
. Forpercentages=[0.3, 0.7]
settingpart=0
would select 30%,part=1
70%. - preprocessor (
core.corpus.preprocess.Preprocessor
): The preprocessor to use, ifNone
usescore.corpus.preprocess.DefaultPreprocessor
View Source
def get_inverse_annotator(self): return InverseQuotes(self.preprocessor)
Get an instance of core.corpus.annotator.InverseAnnotator
for the
Annotator.
Inherited Members
View Source
class InverseQuotes(InverseAnnotator): ANNOTATOR_CLASS = Quotes def _init(self, inverse_map): self.inverse_map = inverse_map try: self.inverse_texts except: self.inverse_texts = self.annotator._get_cached(suffix='inversed_texts') def _create_cache_data(self): self.inverse_texts = {} for count, anno_text in enumerate(self.annotator.texts): anno_text = str(tuple(self.annotator._annotation_id2text([anno_text], only_flatten=True).pop(0))) if anno_text not in self.inverse_texts: self.inverse_texts[anno_text] = [] self.inverse_texts[anno_text].append(count) self.annotator._set_cached(self.inverse_texts, suffix='inversed_texts') inverse_map = {} for keyword, anno_ids in self.annotator.map.items(): for anno_id in anno_ids: anno_id = str(anno_id) if anno_id not in inverse_map: inverse_map[anno_id] = [] inverse_map[anno_id].append(keyword) return inverse_map def is_annotation(self, sentence, annotation): annotation = str(tuple(annotation)) if annotation in self.inverse_texts: anno_keywords = [] for anno_id in self.inverse_texts[annotation]: anno_keywords.extend(self.inverse_map[str(anno_id)]) sentence = list(map(lambda w: w.lower(), sentence)) # all map keys are "lowercase", should here, too return len(set(sentence) & set(anno_keywords)) >= 1 else: return False # unknown annotation
To check predicted annotations it is sometimes necessary to map back from an annotation to the text.
This class allows to get the similarity between a sentence and scd for some annotator.
View Source
def is_annotation(self, sentence, annotation): annotation = str(tuple(annotation)) if annotation in self.inverse_texts: anno_keywords = [] for anno_id in self.inverse_texts[annotation]: anno_keywords.extend(self.inverse_map[str(anno_id)]) sentence = list(map(lambda w: w.lower(), sentence)) # all map keys are "lowercase", should here, too return len(set(sentence) & set(anno_keywords)) >= 1 else: return False # unknown annotation
Checks if the sentence could get the annotation.
Args
- sentence (list of str): The sentence annotated
- annotation (list of str): The annotation given for sentence
Returns
bool, is possible annotation
Inherited Members
View Source
class Quotes(Annotator): """ Annotator using quotes from the quotes dataset. Dataset source: <https://github.com/ShivaliGoel/Quotes-500K> """ DATAFILE = os.path.join(const.DATASET_DIR, "quotes", "data.csv") def __init__(self, **kwargs): super().__init__(**kwargs) if self._is_cached(suffix='map') and self._is_cached(suffix='texts'): self.map = self._get_cached(suffix='map') self.texts = self._get_cached(suffix='texts') else: self._create_map_texts() self._shuffle_annotations() def _create_map_texts(self): self.map = {} self.texts = [] with open(Quotes.DATAFILE, "r", errors='ignore') as f: count = 0 for line in csv.reader(f): annotation_texts = self.preprocessor.preprocess_document(line[0]) annotation_keywords = self.preprocessor.preprocess_words(line[2]) if sum([len(at) for at in annotation_texts]) > 4: added = False for annotation_keyword in annotation_keywords: annotation_keyword = annotation_keyword.lower() if len(annotation_keyword) > 1: if annotation_keyword not in self.map: self.map[annotation_keyword] = [] self.map[annotation_keyword].append(count) added = True if added: self.texts.append(annotation_texts) count += 1 self._set_cached(self.map, suffix='map') self._set_cached(self.texts, suffix='texts') def _shuffle_annotations(self): anno_count = 0 single_keys, del_keys = [], [] # iterate over each key for key in self.map: self.random.shuffle(self.map[key]) # to randomize selected annotations later if self.use_percentages: # splitted annotator? cur_len = len(self.map[key]) if cur_len >= self.len_percentages: # at least one annotation per percentage possible? -> split! start = math.ceil(self.part_start * cur_len) end = start + math.ceil(self.part_len * cur_len) self.map[key] = self.map[key][start:end] # remove if empty if len(self.map[key]) < 1: del_keys.append(key) else: # store for later single_keys.append(key) anno_count += len(self.map[key]) # delete key marked as empty for key in del_keys: del self.map[key] if self.use_percentages: # delete "number of percentages" from single key each cur_len = len(single_keys) start = math.ceil(self.part_start * cur_len) end = start + math.ceil(self.part_len * cur_len) for key in single_keys[:start] + single_keys[end:]: del self.map[key] anno_count += -1 if anno_count < 40000: # maximum of > 500 000 annotations if no splitting used print("======================") print("Very small annotater created, contains only", anno_count, "annotations!") print("======================") self.shuffled_keys = list(self.map.keys()) self.random.shuffle(self.shuffled_keys) def _get_annotations(self, sentence, n): if len(sentence) <= 0: # no words? return [] if n == -1: # "all" annotations? n = 1000 sentence = list(map(lambda w: w.lower(), sentence)) # all map keys are "lowercase", should here, too annotations = [] for word in sentence: if word in self.map: annotations.extend(self.map[word][:n]) l_an = len(annotations) if l_an+1 <= n: anno_ids = annotations[:n] else: anno_ids = [annotations[(i + i*n + self.random_seed) % l_an] for i in range(n)] return self._annotation_id2text(anno_ids) def _annotation_id2text(self, anno_ids, only_flatten = False): if only_flatten: anno_texts = anno_ids else: anno_texts = [self.texts[int(anno_id)] for anno_id in anno_ids] # convert back to text from id return [ [word for sentence in anno for word in sentence] for anno in anno_texts] # make multiple sentence one (flatten inner array) def _get_non_annotations(self, sentence, n): if n == -1: # "all" annotations? n = 1000 sentence = list(map(lambda w: w.lower(), sentence)) # all map keys are "lowercase", should here, too # get ids not to use! disallowed_ids = set() for word in sentence: if word in self.map: disallowed_ids.update(self.map[word]) # randomize starting index s_index = self.random_seed % len(sentence) if sentence[s_index] in self.shuffled_keys: k_index = self.shuffled_keys.index(sentence[s_index]) - 1 if k_index < 0: k_index = len(self.shuffled_keys) - 1 else: k_index = 0 annotations = set() while_count = 0 while len(annotations) < n: if self.shuffled_keys[k_index] not in sentence: annotations.update(self.map[self.shuffled_keys[k_index]]) annotations -= disallowed_ids # make sure to not use disallowed ones k_index = (k_index + len(sentence)) % len(self.shuffled_keys) while_count += 1 if while_count > 1000: # prevent endless loops break return self._annotation_id2text(annotations)[:n] def _get_cachename(self): return "quotes-" + self.preprocessor_name + "-" def get_inverse_annotator(self): return InverseQuotes(self.preprocessor)
Annotator using quotes from the quotes dataset.
Dataset source: https://github.com/ShivaliGoel/Quotes-500K