-
Notifications
You must be signed in to change notification settings - Fork 1
/
utils.py
145 lines (119 loc) · 3.97 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
import io
import gzip
import json
import logging
import os
import torch
import torch.nn.functional as F
from torchtext.data import Field, LabelField, TabularDataset, BucketIterator
def load_vectors(fname, size=None, normalize=True):
# gzipped file
if fname.endswith('.gz'):
fin = gzip.open(fname, 'rb')
else:
fin = io.open(fname, 'r', encoding='utf-8', newline='\n', errors='ignore')
n, d = map(int, fin.readline().split())
if size is None:
size = n
words = {}
X = torch.zeros(size, d)
for i, line in enumerate(fin):
if i >= size:
break
try:
tokens = line.decode('utf-8').rstrip().split(' ')
except AttributeError:
tokens = line.rstrip().split(' ')
word = tokens[0]
vector = list(map(float, tokens[1:]))
words[word] = i
X[i] = torch.tensor(vector)
if normalize:
center = X.mean(0)
X -= center
X = F.normalize(X)
return words, X
def load_docs(fname):
if fname.endswith('.gz'):
fin = gzip.open(fname, 'rb')
else:
fin = io.open(fname, 'r', encoding='utf-8', newline='\n', errors='ignore')
docs = []
for line in fin:
try:
row = line.decode('utf-8').rstrip()
except AttributeError:
row = line.rstrip()
docs.append(row)
return docs
def load_labels(fname):
content = load_docs(fname)
labels = [int(i) for i in content]
return labels
def load_ranking(fname):
content = load_docs(fname)
ranking = [row.split('\t')[0] for row in content]
return ranking
def write_vectors(fname, X, words):
n, d = X.shape
with open(fname, 'w') as f:
print('{} {}'.format(n, d), file=f)
for x, word in zip(X, words):
print(word+' '+' '.join(str(n) for n in x), file=f)
def twod_map(array, mapping):
new_array = [[mapping[j] for j in i] for i in array]
return new_array
def print_2dlist_words(array):
for row in array:
words = ' '.join(row)
print(words)
print('\n')
def load_dct(filename):
dct = []
with open(filename, 'r') as f:
for line in f:
entry = line.strip().split('\t')
dct.append(entry)
return dct
def pad(x, min_length=-1, pad_token='<pad>'):
if len(x) < min_length:
return x + [pad_token for _ in range(min_length - len(x))]
else:
return x
def load_data(paths, batch_size=100, min_length=-1, device='cpu'):
"""Given a list of JSON files, get a list of iterators, vocabulary,
and number of classes.
"""
id_field = Field(dtype=torch.int, sequential=False, use_vocab=False)
text_field = Field(include_lengths=True,
preprocessing=lambda x: pad(x, min_length=min_length))
label_field = LabelField(dtype=torch.long)
fields = {
'id': ('id', id_field),
'text': ('text', text_field),
'label': ('label', label_field)
}
datasets = [TabularDataset(path, 'json', fields) for path in paths]
text_field.build_vocab(*datasets)
label_field.build_vocab(*datasets)
iterators = BucketIterator.splits(
datasets=datasets,
batch_size=batch_size,
sort_key=lambda x: len(x.text),
device=device,
shuffle=True
)
return iterators, text_field.vocab, len(label_field.vocab)
def save_embeds(embed_dir, embeds, word2idx):
"""Save binarized embeddings to a directory."""
os.makedirs(embed_dir, exist_ok=True)
with open(os.path.join(embed_dir, 'words.json'), 'w') as f:
f.write(json.dumps(word2idx))
torch.save(embeds, os.path.join(embed_dir, 'E.pt'))
def load_embeds(embed_dir):
"""Load binarized embeddings from a directory."""
E = torch.load(os.path.join(embed_dir, 'E.pt'))
with open(os.path.join(embed_dir, 'words.json'), 'r') as f:
words = json.load(f)
logging.info('Found {} vectors from {}'.format(E.size()[0], embed_dir))
return E, words