-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtext_helpers.py
More file actions
202 lines (171 loc) · 7.47 KB
/
text_helpers.py
File metadata and controls
202 lines (171 loc) · 7.47 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
# Text Helper Functions
# ---------------------------------------
#
# We pull out text helper functions to reduce redundant code
import string
import os
import urllib.request
import io
import tarfile
import collections
import numpy as np
# Normalize text
def normalize_text(texts, stops):
# Lower case
texts = [x.lower() for x in texts]
# Remove punctuation
texts = [''.join(c for c in x if c not in string.punctuation) for x in texts]
# Remove numbers
texts = [''.join(c for c in x if c not in '0123456789') for x in texts]
# Remove stopwords
texts = [' '.join([word for word in x.split() if word not in (stops)]) for x in texts]
# Trim extra whitespace
texts = [' '.join(x.split()) for x in texts]
return (texts)
# Build dictionary of words
def build_dictionary(sentences, vocabulary_size):
# Turn sentences (list of strings) into lists of words
split_sentences = [s.split() for s in sentences]
words = [x for sublist in split_sentences for x in sublist]
# Initialize list of [word, word_count] for each word, starting with unknown
count = [['RARE', -1]]
# Now add most frequent words, limited to the N-most frequent (N=vocabulary size)
count.extend(collections.Counter(words).most_common(vocabulary_size - 1))
# Now create the dictionary
word_dict = {}
# For each word, that we want in the dictionary, add it, then make it
# the value of the prior dictionary length
for word, word_count in count:
word_dict[word] = len(word_dict)
return (word_dict)
# Turn text data into lists of integers from dictionary
def text_to_numbers(sentences, word_dict):
# Initialize the returned data
data = []
for sentence in sentences:
sentence_data = []
# For each word, either use selected index or rare word index
for word in sentence.split():
if word in word_dict:
word_ix = word_dict[word]
else:
word_ix = 0
sentence_data.append(word_ix)
data.append(sentence_data)
return (data)
# Generate data randomly (N words behind, target, N words ahead)
def generate_batch_data(sentences, batch_size, window_size, method='skip_gram'):
# Fill up data batch
batch_data = []
label_data = []
while len(batch_data) < batch_size:
# select random sentence to start
rand_sentence_ix = int(np.random.choice(len(sentences), size=1))
rand_sentence = sentences[rand_sentence_ix]
# Generate consecutive windows to look at
window_sequences = [rand_sentence[max((ix - window_size), 0):(ix + window_size + 1)] for ix, x in
enumerate(rand_sentence)]
# Denote which element of each window is the center word of interest
label_indices = [ix if ix < window_size else window_size for ix, x in enumerate(window_sequences)]
# Pull out center word of interest for each window and create a tuple for each window
if method == 'skip_gram':
batch_and_labels = [(x[y], x[:y] + x[(y + 1):]) for x, y in zip(window_sequences, label_indices)]
# Make it in to a big list of tuples (target word, surrounding word)
tuple_data = [(x, y_) for x, y in batch_and_labels for y_ in y]
batch, labels = [list(x) for x in zip(*tuple_data)]
elif method == 'cbow':
batch_and_labels = [(x[:y] + x[(y + 1):], x[y]) for x, y in zip(window_sequences, label_indices)]
# Only keep windows with consistent 2*window_size
batch_and_labels = [(x, y) for x, y in batch_and_labels if len(x) == 2 * window_size]
batch, labels = [list(x) for x in zip(*batch_and_labels)]
elif method == 'doc2vec':
# For doc2vec we keep LHS window only to predict target word
batch_and_labels = [(rand_sentence[i:i + window_size], rand_sentence[i + window_size]) for i in
range(0, len(rand_sentence) - window_size)]
batch, labels = [list(x) for x in zip(*batch_and_labels)]
# Add document index to batch!! Remember that we must extract the last index in batch for the doc-index
batch = [x + [rand_sentence_ix] for x in batch]
else:
raise ValueError('Method {} not implmented yet.'.format(method))
# extract batch and labels
batch_data.extend(batch[:batch_size])
label_data.extend(labels[:batch_size])
# Trim batch and label at the end
batch_data = batch_data[:batch_size]
label_data = label_data[:batch_size]
# Convert to numpy array
batch_data = np.array(batch_data)
label_data = np.transpose(np.array([label_data]))
return (batch_data, label_data)
# Load the movie review data
# Check if data was downloaded, otherwise download it and save for future use
def load_movie_data(data_folder_name):
pos_file = os.path.join(data_folder_name, 'data.pos')
neg_file = os.path.join(data_folder_name, 'target.neg')
# Check if files are already downloaded
if os.path.isfile(pos_file):
pos_data = []
with open(pos_file, 'r', encoding="latin-1") as temp_pos_file:
for row in temp_pos_file:
pos_data.append(row)
neg_data = []
with open(neg_file, 'r', encoding="latin-1") as temp_neg_file:
for row in temp_neg_file:
neg_data.append(row)
else: # If not downloaded, download and save
print("SAd")
movie_data_url = 'http://www.cs.cornell.edu/people/pabo/movie-review-data/rt-polaritydata.tar.gz'
stream_data = urllib.request.urlopen(movie_data_url)
tmp = io.BytesIO()
while True:
s = stream_data.read(16384)
if not s:
break
tmp.write(s)
stream_data.close()
tmp.seek(0)
tar_file = tarfile.open(fileobj=tmp, mode="r:gz")
pos = tar_file.extractfile('rt-polaritydata/rt-polarity.pos')
neg = tar_file.extractfile('rt-polaritydata/rt-polarity.neg')
# Save pos/neg reviews
pos_data = []
for line in pos:
pos_data.append(line.decode('ISO-8859-1').encode('ascii', errors='ignore').decode())
neg_data = []
for line in neg:
neg_data.append(line.decode('ISO-8859-1').encode('ascii', errors='ignore').decode())
tar_file.close()
# Write to file
if not os.path.exists(save_folder_name):
os.makedirs(save_folder_name)
# Save files
with open(pos_file, "w") as pos_file_handler:
pos_file_handler.write(''.join(pos_data))
with open(neg_file, "w") as neg_file_handler:
neg_file_handler.write(''.join(neg_data))
texts = pos_data + neg_data
target = [1] * len(pos_data) + [0] * len(neg_data)
return (texts, target)
def load_product_data():
texts = []
tot_data = 10000000000000
# tot_data = 100
with open("./data/dataTitle.txt", "r") as fp:
for row in fp:
texts.append(row)
if len(texts) >= tot_data:
break
target = []
with open("./data/dataDimesions.txt", "r") as fp:
for row in fp:
tmp = row.split(" ")
tmpL = list()
tmpL.append(float(tmp[0]))
tmpL.append(float(tmp[1]))
tmpL.append(float(tmp[2]))
tmpL.append(float(tmp[3]))
target.append(tmpL)
if len(target) >= tot_data:
break
print(len(texts), len(target))
return texts, target