-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathdata_helpers.py
82 lines (68 loc) · 3.28 KB
/
data_helpers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
import numpy as np
import regex
import codecs
def clean_str(string):
"""
Tokenization/string cleaning for all datasets except for SST.
Original taken from https://github.com/yoonkim/CNN_sentence/blob/master/process_data.py
"""
string = regex.sub(r"<[^>]+>", "", string)
string = regex.sub(r"[^\s\p{Latin}']", "", string)
string = regex.sub(r"[^A-Za-z0-9(),!?\'\`]", " ", string)
string = regex.sub(r"\\'s", " \'s", string)
string = regex.sub(r"\\'ve", " \'ve", string)
string = regex.sub(r"n\\'t", " n\'t", string)
string = regex.sub(r"\\'re", " \'re", string)
string = regex.sub(r"\\'d", " \'d", string)
string = regex.sub(r"\\'ll", " \'ll", string)
string = regex.sub(r",", " , ", string)
string = regex.sub(r"!", " ! ", string)
string = regex.sub(r"\(", " \( ", string)
string = regex.sub(r"\)", " \) ", string)
string = regex.sub(r"\?", " \? ", string)
string = regex.sub(r"\s{2,}", " ", string)
return string.strip().lower()
def load_train_data(source_path, target_path, SOURCE_MAX_LENGTH=10, TARGET_MAX_LENGTH=10):
source_sents = []
target_sents = []
for s, t in zip(codecs.open(source_path, 'r', 'utf-8').read().split("\n"),
codecs.open(target_path, 'r', 'utf-8').read().split("\n")):
if (s and s[0] != "<") and (t and t[0] != "<"):
slen = len(s.split())
tlen = len(t.split())
if slen < SOURCE_MAX_LENGTH and tlen < TARGET_MAX_LENGTH:
source_sents.append(clean_str(s) + " _EOS_" + (" _PAD_" * (SOURCE_MAX_LENGTH - slen - 1)))
target_sents.append(clean_str(t) + " _EOS_" + (" _PAD_" * (TARGET_MAX_LENGTH - tlen - 1)))
return source_sents, target_sents
def load_test_data(source_path, target_path, SOURCE_MAX_LENGTH=10, TARGET_MAX_LENGTH=10):
source_sents = []
target_sents = []
for s, t in zip(codecs.open(source_path, 'r', 'utf-8').read().split("\n"),
codecs.open(target_path, 'r', 'utf-8').read().split("\n")):
if (s and s[:4] == "<seg") and (t and t[:4] == "<seg"):
slen = len(s.split())
tlen = len(t.split())
if slen < SOURCE_MAX_LENGTH and tlen < TARGET_MAX_LENGTH:
source_sents.append(clean_str(s) + " _EOS_" + (" _PAD_" * (SOURCE_MAX_LENGTH - slen - 1)))
target_sents.append(clean_str(t) + " _EOS_" + (" _PAD_" * (TARGET_MAX_LENGTH - tlen - 1)))
return source_sents, target_sents
def batch_iter(data, batch_size, num_epochs, shuffle=True):
"""
Generates a batch iterator for a dataset.
"""
data = np.array(data)
data_size = len(data)
num_batches_per_epoch = int((len(data) - 1) / batch_size) + 1
for epoch in range(num_epochs):
# Shuffle the data at each epoch
if shuffle:
shuffle_indices = np.random.permutation(np.arange(data_size))
shuffled_data = data[shuffle_indices]
else:
shuffled_data = data
for batch_num in range(num_batches_per_epoch):
start_index = batch_num * batch_size
end_index = min((batch_num + 1) * batch_size, data_size)
yield shuffled_data[start_index:end_index]
if __name__ == "__main__":
load_train_data("corpora/train.tags.de-en.de", "corpora/train.tags.de-en.en")