core.model.transformer.datasets
View Source
import math from core.model.transformer.dataset import Dataset from core.utils import calc_metrics, Random, interval_similarity class NextSentenceDataset(Dataset): ''' BERT data set for "Is next sentence a SCD?" ''' def _do_tokenize(self, s_1, s_2, l): data = self.tokenizer( s_1, s_2, is_split_into_words=True, truncation='longest_first', padding='max_length', max_length=self.max_len ) data['labels'] = l return data def _load_and_tokenize(self): # get the max length max_len = 0 last_sentence = None for sentence, scds in self.annotated_corpus.iterate_sentence_scds(n=1): max_len = max( max_len, self._tokenized_len(sentence) + ( self._tokenized_len(scds[0]) if len(scds) == 1 else 0 ) + 3, # add three special chars self._tokenized_len(sentence) + ( self._tokenized_len(last_sentence) if last_sentence != None else 0 ) + 3 ) last_sentence = sentence # check model max len self.max_len = min(NextSentenceDataset.MAX_INPUT_LEN, max_len) local_random = Random.get_generator() # iterate last_sentence = None data_cache = [] for sentence, scds in self.annotated_corpus.iterate_sentence_scds(n=1): # # # # # # # # # Wo do no chunking here!!! # I.e. sentence and scd will be truncated at the end to match max length requirement! # Therefore, some part of sentence/ scd may be omitted! # # # # # # # # # create "sentence, scd" and "last_sentence, current sentence" if len(scds) == 1: data_cache.append(self._do_tokenize(sentence, scds[0], 0)) # label == 0 => indicates sequence B continuation of A if last_sentence != None: data_cache.append(self._do_tokenize(last_sentence, sentence, 1)) # label == 1 => indicates sequence B is random sequence last_sentence = sentence # shuffle cache and add as chunk if len(data_cache) > 100: local_random.shuffle(data_cache) for d in data_cache: self._add(d) data_cache = [] # add rest local_random.shuffle(data_cache) for d in data_cache: self._add(d) def compute_metrics(self, eval_result): return calc_metrics( (eval_result.label_ids - 1) * -1, # make 1 to 0 and 0 to 1 (cause labels: next_sentence = 0, random_sentence = 1) (eval_result.predictions.argmax(-1) - 1) * -1 ) class ClassificationDataset(Dataset): ''' BERT data set for "Is the (current) sentence a SCD?" ''' def _load_and_tokenize(self): # get the max length max_len = 0 for sentences, _ in self.annotated_corpus.iterate_inline_scd_texts(): max_curr = max(map(lambda s: self._tokenized_len(s), sentences)) + 2 # add two special chars max_len = max_curr if max_curr > max_len else max_len # check model max len max_len = min(ClassificationDataset.MAX_INPUT_LEN, max_len) local_random = Random.get_generator() # iterate for sentences, is_scds in self.annotated_corpus.iterate_inline_scd_texts(): sentences_labels = list(zip(sentences, is_scds)) local_random.shuffle(sentences_labels) for sentence, is_scd in sentences_labels: # chunk sentences chunked_sentence = [] for i in range(math.ceil(len(sentence)/ClassificationDataset.MAX_INPUT_LEN)): chunked_sentence.append(sentence[i*ClassificationDataset.MAX_INPUT_LEN:(i+1)*ClassificationDataset.MAX_INPUT_LEN]) # add chunks for s in chunked_sentence: data = self.tokenizer( s, is_split_into_words=True, truncation='longest_first', padding='max_length', max_length=max_len ) data['labels'] = 1 if is_scd else 0 self._add(data) def compute_metrics(self, eval_result): return calc_metrics( eval_result.label_ids, eval_result.predictions.argmax(-1) ) class MultipleChoiceDataset(Dataset): ''' BERT data set for "Given a sentence and a selection of SCDs, select the best." ''' NUM_CHOICES = 4 ''' Defines the number of SCDs the system has to choose one matching from. Change with caution -- will **not** be recognized by model cache! ''' BATCH_SIZE_INNER = NUM_CHOICES """ Each item contains one row of "input_ids" per possible choice! Calculated based on `MultipleChoiceDataset.NUM_CHOICES` """ def _load_and_tokenize(self): # calc max len max_len = 0 for sentence, scds, non_scds in self.annotated_corpus.iterate_sentence_scds_non_scds(matching_n=1, non_matching_n=MultipleChoiceDataset.NUM_CHOICES-1): if len(scds) == 1 and len(non_scds) > 0: max_len = max( max_len, self._tokenized_len(sentence) + max(map(lambda s: self._tokenized_len(s), non_scds)) + 3, # 3 add three special chars self._tokenized_len(sentence) + self._tokenized_len(scds[0]) + 3 ) # check model max len max_len = min(MultipleChoiceDataset.MAX_INPUT_LEN, max_len) local_random = Random.get_generator() # iterate for sentence, scds, non_scds in self.annotated_corpus.iterate_sentence_scds_non_scds(matching_n=1, non_matching_n=MultipleChoiceDataset.NUM_CHOICES-1): # # # # # # # # # Wo do no chunking here!!! # I.e. sentence and scd will be truncated at the end to match max length requirement! # Therefore, some part of sentence/ scd may be omitted! # # # # # # # # # found something? if len(scds) == 1 and len(non_scds) > 0: cur_num_choices = len(non_scds) + 1 correct_i = local_random.randrange(cur_num_choices) answers = [ non_scds.pop(0) if i != correct_i else scds[0] for i in range(cur_num_choices)] data = self.tokenizer( [sentence] * cur_num_choices, answers, is_split_into_words=True, truncation='longest_first', padding='max_length', max_length=max_len, ) data['labels'] = correct_i self._add(data) def compute_metrics(self, eval_result): predictions = eval_result.predictions.argmax(-1) labels = eval_result.label_ids count_correct = (predictions == labels).sum() count_all = len(labels) return { 'accuracy' : count_correct / count_all, 'num' : count_all } class QuestionAnswerDataset(Dataset): ''' BERT data set for "Given a sentence (=text/ scd) and text of other sentences (=scd/ text) which of them match?" See `QuestionTextAnswerSCDDataset` and `QuestionSCDAnswerTextDataset` for the two directions. **Do not use** this class, use the subclasses! ''' def _load_and_tokenize(self): local_random = Random.get_generator() for sentences, scds, mapping_t_s, mapping_s_t in self.annotated_corpus.iterate_assign_scds_text(): # ask a text and select matching SCD or other way round? if type(self)._TEXT_SCDS: questions = sentences answers = scds mapping = mapping_t_s else: questions = scds answers = sentences mapping = mapping_s_t # create chunk for i,question in enumerate(questions): rest_len = QuestionAnswerDataset.MAX_INPUT_LEN - self._tokenized_len(question) - 3 # three special chars! correct_answer = answers[mapping[i]] rest_len -= self._tokenized_len(correct_answer) presented_answers = [correct_answer] iter_list = list(range(len(answers))) local_random.shuffle(iter_list) num_incorrect = 0 for j in iter_list: if rest_len <= 0: # add to fill up models max input size break; if j != mapping[i] and answers[j] not in presented_answers: # only add new false answers now! presented_answers.append(answers[j]) rest_len -= self._tokenized_len(answers[j]) - 1 # we will add a dot at the end! num_incorrect += 1 if num_incorrect > 0: # only one correct choice makes no sense! local_random.shuffle(presented_answers) start_i = 0 presented_text = [] for answer in presented_answers: if answer == correct_answer: start_i = self._tokenized_len(presented_text) presented_text.extend(answer) presented_text.append('.') data = self.tokenizer( question, presented_text[:-1], # remove last dot is_split_into_words=True, truncation='longest_first', padding='max_length', max_length=QuestionAnswerDataset.MAX_INPUT_LEN, ) data['start_positions'] = data['input_ids'].index(self.tokenizer.sep_token_id) + 1 + start_i # ([question] [SEP]) + start_i data['end_positions'] = data['start_positions'] + self._tokenized_len(correct_answer) # end position must no be out of input_range! if self.tokenizer.pad_token_id in data['input_ids']: last_index = data['input_ids'].index(self.tokenizer.pad_token_id) - 2 # [LAST] [SEP] "index [PAD]" else: last_index = len(data['input_ids']) - 2 # [LAST] [SEP] "len" if data['end_positions'] > last_index: # take until last possible position data['end_positions'] = last_index self._add(data) def compute_metrics(self, eval_result): # get lists of (start, end) predictions = zip(eval_result.predictions[0].argmax(-1), eval_result.predictions[1].argmax(-1)) labels = zip(eval_result.label_ids[0], eval_result.label_ids[1]) count_all = len(eval_result.label_ids[0]) count_equal = 0 count_include = 0 count_part = 0 similarity_sum = 0 for prediction, label in zip(predictions, labels): if prediction[0] == label[0] and prediction[1] == label[1]: # same interval count_equal += 1 if prediction[0] <= label[0] and prediction[1] >= label[1]: # predicted interval is larger and includes correct count_include += 1 if prediction[0] >= label[0] and prediction[1] <= label[1]: # predicted interval is smaller and included in correct count_part += 1 similarity_sum += interval_similarity(*prediction, *label) return { 'accuracy' : count_equal / count_all, 'accuracy_include' : count_include / count_all, 'accuracy_part' : count_part / count_all, 'avg_similarity' : similarity_sum / count_all, 'num' : count_all } class QuestionTextAnswerSCDDataset(QuestionAnswerDataset): ''' BERT data set for "Given a sentence from text and scds which scd matches sentence?" ''' _TEXT_SCDS = True # ask a text and select matching SCD? class QuestionSCDAnswerTextDataset(QuestionAnswerDataset): ''' BERT data set for "Given a scd and text which sentence from text matches scd?" ''' _TEXT_SCDS = False # ask a text and select matching SCD?
View Source
class NextSentenceDataset(Dataset): ''' BERT data set for "Is next sentence a SCD?" ''' def _do_tokenize(self, s_1, s_2, l): data = self.tokenizer( s_1, s_2, is_split_into_words=True, truncation='longest_first', padding='max_length', max_length=self.max_len ) data['labels'] = l return data def _load_and_tokenize(self): # get the max length max_len = 0 last_sentence = None for sentence, scds in self.annotated_corpus.iterate_sentence_scds(n=1): max_len = max( max_len, self._tokenized_len(sentence) + ( self._tokenized_len(scds[0]) if len(scds) == 1 else 0 ) + 3, # add three special chars self._tokenized_len(sentence) + ( self._tokenized_len(last_sentence) if last_sentence != None else 0 ) + 3 ) last_sentence = sentence # check model max len self.max_len = min(NextSentenceDataset.MAX_INPUT_LEN, max_len) local_random = Random.get_generator() # iterate last_sentence = None data_cache = [] for sentence, scds in self.annotated_corpus.iterate_sentence_scds(n=1): # # # # # # # # # Wo do no chunking here!!! # I.e. sentence and scd will be truncated at the end to match max length requirement! # Therefore, some part of sentence/ scd may be omitted! # # # # # # # # # create "sentence, scd" and "last_sentence, current sentence" if len(scds) == 1: data_cache.append(self._do_tokenize(sentence, scds[0], 0)) # label == 0 => indicates sequence B continuation of A if last_sentence != None: data_cache.append(self._do_tokenize(last_sentence, sentence, 1)) # label == 1 => indicates sequence B is random sequence last_sentence = sentence # shuffle cache and add as chunk if len(data_cache) > 100: local_random.shuffle(data_cache) for d in data_cache: self._add(d) data_cache = [] # add rest local_random.shuffle(data_cache) for d in data_cache: self._add(d) def compute_metrics(self, eval_result): return calc_metrics( (eval_result.label_ids - 1) * -1, # make 1 to 0 and 0 to 1 (cause labels: next_sentence = 0, random_sentence = 1) (eval_result.predictions.argmax(-1) - 1) * -1 )
BERT data set for "Is next sentence a SCD?"
View Source
def compute_metrics(self, eval_result): return calc_metrics( (eval_result.label_ids - 1) * -1, # make 1 to 0 and 0 to 1 (cause labels: next_sentence = 0, random_sentence = 1) (eval_result.predictions.argmax(-1) - 1) * -1 )
Function to calculate metrics from returned predictions of model.
Use with transformers.Trainer
as compute_metrics=Dataset.compute_metrics
Inherited Members
View Source
class ClassificationDataset(Dataset): ''' BERT data set for "Is the (current) sentence a SCD?" ''' def _load_and_tokenize(self): # get the max length max_len = 0 for sentences, _ in self.annotated_corpus.iterate_inline_scd_texts(): max_curr = max(map(lambda s: self._tokenized_len(s), sentences)) + 2 # add two special chars max_len = max_curr if max_curr > max_len else max_len # check model max len max_len = min(ClassificationDataset.MAX_INPUT_LEN, max_len) local_random = Random.get_generator() # iterate for sentences, is_scds in self.annotated_corpus.iterate_inline_scd_texts(): sentences_labels = list(zip(sentences, is_scds)) local_random.shuffle(sentences_labels) for sentence, is_scd in sentences_labels: # chunk sentences chunked_sentence = [] for i in range(math.ceil(len(sentence)/ClassificationDataset.MAX_INPUT_LEN)): chunked_sentence.append(sentence[i*ClassificationDataset.MAX_INPUT_LEN:(i+1)*ClassificationDataset.MAX_INPUT_LEN]) # add chunks for s in chunked_sentence: data = self.tokenizer( s, is_split_into_words=True, truncation='longest_first', padding='max_length', max_length=max_len ) data['labels'] = 1 if is_scd else 0 self._add(data) def compute_metrics(self, eval_result): return calc_metrics( eval_result.label_ids, eval_result.predictions.argmax(-1) )
BERT data set for "Is the (current) sentence a SCD?"
View Source
def compute_metrics(self, eval_result): return calc_metrics( eval_result.label_ids, eval_result.predictions.argmax(-1) )
Function to calculate metrics from returned predictions of model.
Use with transformers.Trainer
as compute_metrics=Dataset.compute_metrics
Inherited Members
View Source
class MultipleChoiceDataset(Dataset): ''' BERT data set for "Given a sentence and a selection of SCDs, select the best." ''' NUM_CHOICES = 4 ''' Defines the number of SCDs the system has to choose one matching from. Change with caution -- will **not** be recognized by model cache! ''' BATCH_SIZE_INNER = NUM_CHOICES """ Each item contains one row of "input_ids" per possible choice! Calculated based on `MultipleChoiceDataset.NUM_CHOICES` """ def _load_and_tokenize(self): # calc max len max_len = 0 for sentence, scds, non_scds in self.annotated_corpus.iterate_sentence_scds_non_scds(matching_n=1, non_matching_n=MultipleChoiceDataset.NUM_CHOICES-1): if len(scds) == 1 and len(non_scds) > 0: max_len = max( max_len, self._tokenized_len(sentence) + max(map(lambda s: self._tokenized_len(s), non_scds)) + 3, # 3 add three special chars self._tokenized_len(sentence) + self._tokenized_len(scds[0]) + 3 ) # check model max len max_len = min(MultipleChoiceDataset.MAX_INPUT_LEN, max_len) local_random = Random.get_generator() # iterate for sentence, scds, non_scds in self.annotated_corpus.iterate_sentence_scds_non_scds(matching_n=1, non_matching_n=MultipleChoiceDataset.NUM_CHOICES-1): # # # # # # # # # Wo do no chunking here!!! # I.e. sentence and scd will be truncated at the end to match max length requirement! # Therefore, some part of sentence/ scd may be omitted! # # # # # # # # # found something? if len(scds) == 1 and len(non_scds) > 0: cur_num_choices = len(non_scds) + 1 correct_i = local_random.randrange(cur_num_choices) answers = [ non_scds.pop(0) if i != correct_i else scds[0] for i in range(cur_num_choices)] data = self.tokenizer( [sentence] * cur_num_choices, answers, is_split_into_words=True, truncation='longest_first', padding='max_length', max_length=max_len, ) data['labels'] = correct_i self._add(data) def compute_metrics(self, eval_result): predictions = eval_result.predictions.argmax(-1) labels = eval_result.label_ids count_correct = (predictions == labels).sum() count_all = len(labels) return { 'accuracy' : count_correct / count_all, 'num' : count_all }
BERT data set for "Given a sentence and a selection of SCDs, select the best."
Defines the number of SCDs the system has to choose one matching from.
Change with caution -- will not be recognized by model cache!
Each item contains one row of "input_ids" per possible choice!
Calculated based on MultipleChoiceDataset.NUM_CHOICES
View Source
def compute_metrics(self, eval_result): predictions = eval_result.predictions.argmax(-1) labels = eval_result.label_ids count_correct = (predictions == labels).sum() count_all = len(labels) return { 'accuracy' : count_correct / count_all, 'num' : count_all }
Function to calculate metrics from returned predictions of model.
Use with transformers.Trainer
as compute_metrics=Dataset.compute_metrics
Inherited Members
View Source
class QuestionAnswerDataset(Dataset): ''' BERT data set for "Given a sentence (=text/ scd) and text of other sentences (=scd/ text) which of them match?" See `QuestionTextAnswerSCDDataset` and `QuestionSCDAnswerTextDataset` for the two directions. **Do not use** this class, use the subclasses! ''' def _load_and_tokenize(self): local_random = Random.get_generator() for sentences, scds, mapping_t_s, mapping_s_t in self.annotated_corpus.iterate_assign_scds_text(): # ask a text and select matching SCD or other way round? if type(self)._TEXT_SCDS: questions = sentences answers = scds mapping = mapping_t_s else: questions = scds answers = sentences mapping = mapping_s_t # create chunk for i,question in enumerate(questions): rest_len = QuestionAnswerDataset.MAX_INPUT_LEN - self._tokenized_len(question) - 3 # three special chars! correct_answer = answers[mapping[i]] rest_len -= self._tokenized_len(correct_answer) presented_answers = [correct_answer] iter_list = list(range(len(answers))) local_random.shuffle(iter_list) num_incorrect = 0 for j in iter_list: if rest_len <= 0: # add to fill up models max input size break; if j != mapping[i] and answers[j] not in presented_answers: # only add new false answers now! presented_answers.append(answers[j]) rest_len -= self._tokenized_len(answers[j]) - 1 # we will add a dot at the end! num_incorrect += 1 if num_incorrect > 0: # only one correct choice makes no sense! local_random.shuffle(presented_answers) start_i = 0 presented_text = [] for answer in presented_answers: if answer == correct_answer: start_i = self._tokenized_len(presented_text) presented_text.extend(answer) presented_text.append('.') data = self.tokenizer( question, presented_text[:-1], # remove last dot is_split_into_words=True, truncation='longest_first', padding='max_length', max_length=QuestionAnswerDataset.MAX_INPUT_LEN, ) data['start_positions'] = data['input_ids'].index(self.tokenizer.sep_token_id) + 1 + start_i # ([question] [SEP]) + start_i data['end_positions'] = data['start_positions'] + self._tokenized_len(correct_answer) # end position must no be out of input_range! if self.tokenizer.pad_token_id in data['input_ids']: last_index = data['input_ids'].index(self.tokenizer.pad_token_id) - 2 # [LAST] [SEP] "index [PAD]" else: last_index = len(data['input_ids']) - 2 # [LAST] [SEP] "len" if data['end_positions'] > last_index: # take until last possible position data['end_positions'] = last_index self._add(data) def compute_metrics(self, eval_result): # get lists of (start, end) predictions = zip(eval_result.predictions[0].argmax(-1), eval_result.predictions[1].argmax(-1)) labels = zip(eval_result.label_ids[0], eval_result.label_ids[1]) count_all = len(eval_result.label_ids[0]) count_equal = 0 count_include = 0 count_part = 0 similarity_sum = 0 for prediction, label in zip(predictions, labels): if prediction[0] == label[0] and prediction[1] == label[1]: # same interval count_equal += 1 if prediction[0] <= label[0] and prediction[1] >= label[1]: # predicted interval is larger and includes correct count_include += 1 if prediction[0] >= label[0] and prediction[1] <= label[1]: # predicted interval is smaller and included in correct count_part += 1 similarity_sum += interval_similarity(*prediction, *label) return { 'accuracy' : count_equal / count_all, 'accuracy_include' : count_include / count_all, 'accuracy_part' : count_part / count_all, 'avg_similarity' : similarity_sum / count_all, 'num' : count_all }
BERT data set for "Given a sentence (=text/ scd) and text of other sentences (=scd/ text) which of them match?"
See QuestionTextAnswerSCDDataset
and QuestionSCDAnswerTextDataset
for the
two directions. Do not use this class, use the subclasses!
View Source
def compute_metrics(self, eval_result): # get lists of (start, end) predictions = zip(eval_result.predictions[0].argmax(-1), eval_result.predictions[1].argmax(-1)) labels = zip(eval_result.label_ids[0], eval_result.label_ids[1]) count_all = len(eval_result.label_ids[0]) count_equal = 0 count_include = 0 count_part = 0 similarity_sum = 0 for prediction, label in zip(predictions, labels): if prediction[0] == label[0] and prediction[1] == label[1]: # same interval count_equal += 1 if prediction[0] <= label[0] and prediction[1] >= label[1]: # predicted interval is larger and includes correct count_include += 1 if prediction[0] >= label[0] and prediction[1] <= label[1]: # predicted interval is smaller and included in correct count_part += 1 similarity_sum += interval_similarity(*prediction, *label) return { 'accuracy' : count_equal / count_all, 'accuracy_include' : count_include / count_all, 'accuracy_part' : count_part / count_all, 'avg_similarity' : similarity_sum / count_all, 'num' : count_all }
Function to calculate metrics from returned predictions of model.
Use with transformers.Trainer
as compute_metrics=Dataset.compute_metrics
Inherited Members
View Source
class QuestionTextAnswerSCDDataset(QuestionAnswerDataset): ''' BERT data set for "Given a sentence from text and scds which scd matches sentence?" ''' _TEXT_SCDS = True # ask a text and select matching SCD?
BERT data set for "Given a sentence from text and scds which scd matches sentence?"
View Source
class QuestionSCDAnswerTextDataset(QuestionAnswerDataset): ''' BERT data set for "Given a scd and text which sentence from text matches scd?" ''' _TEXT_SCDS = False # ask a text and select matching SCD?
BERT data set for "Given a scd and text which sentence from text matches scd?"