0%

<数据分析> 中文文本分类FastText-pytorch

摘要:中文文本分类FastText-pytorch
代码:https://github.com/649453932/Chinese-Text-Classification-Pytorch

引言

因为学习需要用到文本分析相关模型,就根据一个github库中的代码尝试实现文本分类的过程,在注释中添加学习笔记,若有错误请指正。
代码地址:https://github.com/649453932/Chinese-Text-Classification-Pytorch

项目结构

!\[在这里插入图片描述\](https://img-blog.csdnimg.cn/20200612180633686.PNG
其中runFastText为主函数,train_eval是训练函数,utils_fastTextTest是数据处理函数,名字与原github中有差别,代码大部分都相同。

utils_fastTextTest.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
# coding: UTF-8
import os
import torch
import numpy as np
import pickle as pkl
from tqdm import tqdm
import time
from datetime import timedelta

MAX_VOCAB_SIZE = 10000 # 词表长度限制
UNK, PAD = '<UNK>', '<PAD>' # 未知字,padding符号

def build_vocab(file_path, tokenizer, max_size, min_freq):
"""
构建一个词表:
首先对数据集中的每一行句子按字/空格进行分割,然后统计所有元素的出现频率
接下来按照频率从高到低的顺序对所有频率大于min_freq的元素进行排序,取前max_size个元素
最后按照频率降序构建字典vocab_dic:{元素:序号},vocab_dic的最后两个元素是'<UNK>'和'<PAD>'
"""
vocab_dic = {}
with open(file_path, 'r', encoding='UTF-8') as f:
# tqdm是一个python的进度条工具包,在终端运行时显示程序循环的进度
for line in tqdm(f):# 处理每一行
lin = line.strip() # 移除头尾空格或换行符
if not lin: # 跳过空行
continue
content = lin.split('\t')[0] # 句子和标签通过tab分割,前面的是句子内容,后面的是标签(lin.split('\t')[1])
for word in tokenizer(content): # 按空格分割或者按字分割,tokenizer为可选择参数
vocab_dic[word] = vocab_dic.get(word, 0) + 1 # 统计词频或字频,即为每个词或字在训练集中出现的次数
# 遍历词典,筛选出词频大于min_freq=1的词,然后按照词频从高到低排序,取前max_size=10000个词,组成新的列表vocab_list,vocab_list中的元素为元组(word, freq)
"""
sorted函数用法:sorted(iterable, cmp=None, key=None, reverse=False)
iterable -- 可迭代对象。
cmp -- 比较的函数,这个具有两个参数,参数的值都是从可迭代对象中取出,此函数必须遵守的规则为,大于则返回1,小于则返回-1,等于则返回0。
key -- 主要是用来进行比较的元素,只有一个参数,具体的函数的参数就是取自于可迭代对象中,指定可迭代对象中的一个元素来进行排序。
reverse -- 排序规则,reverse = True 降序 , reverse = False 升序(默认)。
"""
vocab_list = sorted([_ for _ in vocab_dic.items() if _[1] >= min_freq], key=lambda x: x[1], reverse=True)[:max_size]
vocab_dic = {word_count[0]: idx for idx, word_count in enumerate(vocab_list)}
# 在vocab_dic的最后增加两个元素:{'<UNK>':len(vocab_dic)}和{'<PAD>':len(vocab_dic)+1}
vocab_dic.update({UNK: len(vocab_dic), PAD: len(vocab_dic) + 1})
return vocab_dic

def build_dataset(config, ues_word):
"""
加载数据集:
对数据集中的每一行,先分离内容和标签
然后对句子内容,按指定的方式进行分割(依照空格或字符),接着根据pad_size进行补足或截断
接着把分割后的元素,通过词表转化成一串序号words_line
最后把所有句子处理后的结果组成一个大列表,列表中的元素为:[(words_line, int(label), seq_len),...]
"""
if ues_word:
tokenizer = lambda x: x.split(' ') # 以空格隔开,word-level
else:
tokenizer = lambda x: [y for y in x] # char-level
# if os.path.exists(config.vocab_path):
# vocab = pkl.load(open(config.vocab_path, 'rb'))
# else:
# 此处使用训练集自己构建词表
vocab = build_vocab(config.train_path, tokenizer=tokenizer, max_size=MAX_VOCAB_SIZE, min_freq=1)
# 构建完了之后保存为pickle
pkl.dump(vocab, open(config.vocab_path, 'wb'))
print(f"Vocab size: {len(vocab)}")

def biGramHash(sequence, t, buckets):
t1 = sequence[t - 1] if t - 1 >= 0 else 0
return (t1 * 14918087) % buckets

def triGramHash(sequence, t, buckets):
t1 = sequence[t - 1] if t - 1 >= 0 else 0
t2 = sequence[t - 2] if t - 2 >= 0 else 0
return (t2 * 14918087 * 18408749 + t1 * 14918087) % buckets

def load_dataset(path, pad_size=32):
contents = []
with open(path, 'r', encoding='UTF-8') as f:
for line in tqdm(f): # 打开数据文件,按行读取
lin = line.strip() # 移除头尾空格或换行符
if not lin: # 跳过空行
continue
content, label = lin.split('\t') # 句子和标签通过tab分割,前面的是句子内容,后面的是标签
words_line = [] # words_line是句子通过词表转化后得到的数字表示
token = tokenizer(content) # 按空格或字符来分割句子
seq_len = len(token) # 得到分割后的元素数量
if pad_size: # 如果有指定填充长度
if len(token) < pad_size:
token.extend([PAD] * (pad_size - len(token))) #padding填充
else:
token = token[:pad_size] # 直接截断至填充长度
seq_len = pad_size # 更新元素数量
# word to id
for word in token:
# 拿到该元素在词表中的序号,然后将这个序号添加到words_line中。如果该元素不在词表中,则填入'<UNK>'(未知字)的序号
words_line.append(vocab.get(word, vocab.get(UNK)))

# fasttext ngram
buckets = config.n_gram_vocab
bigram = []
trigram = []
# ------ngram------
for i in range(pad_size):
bigram.append(biGramHash(words_line, i, buckets))
trigram.append(triGramHash(words_line, i, buckets))
# -----------------
# 在contents中存入一个元组,元组的内容为(words_line,数字标签,元素数量,bigram,trigram)
contents.append((words_line, int(label), seq_len, bigram, trigram))
return contents # [([...], 0), ([...], 1), ...]
train = load_dataset(config.train_path, config.pad_size)
dev = load_dataset(config.dev_path, config.pad_size)
test = load_dataset(config.test_path, config.pad_size)
return vocab, train, dev, test

class DatasetIterater(object):
"""
根据数据集产生batch
这里需要注意的是,在_to_tensor()中,代码把batch中的数据处理成了`(x, seq_len), y`的形式
其中x是words_line,seq_len是pad前的长度(超过pad_size的设为pad_size),y是数据标签
"""
# 这里的batches就是经过build_dataset()中的load_dataset()处理后得到的contents:(words_line, int(label), seq_len, bigram, trigram)
def __init__(self, batches, batch_size, device):
self.batch_size = batch_size # batch的容量(一次进多少个句子)
self.batches = batches # 数据集
self.n_batches = len(batches) // batch_size # 数据集大小整除batch容量
self.residue = False # 记录batch数量是否为整数,false代表可以,true代表不可以,residuere是‘剩余物,残渣'的意思
if len(batches) % self.n_batches != 0:
self.residue = True
self.index = 0 # 迭代用的索引
self.device = device

def _to_tensor(self, datas):
# xx = [xxx[2] for xxx in datas]
# indexx = np.argsort(xx)[::-1]
# datas = np.array(datas)[indexx]
x = torch.LongTensor([_[0] for _ in datas]).to(self.device) # 句子words_line
y = torch.LongTensor([_[1] for _ in datas]).to(self.device) # 标签
bigram = torch.LongTensor([_[3] for _ in datas]).to(self.device)
trigram = torch.LongTensor([_[4] for _ in datas]).to(self.device)

# pad前的长度(超过pad_size的设为pad_size,未超过的为原seq_size不变)
seq_len = torch.LongTensor([_[2] for _ in datas]).to(self.device)
return (x, seq_len, bigram, trigram), y

def __next__(self):
if self.residue and self.index == self.n_batches: # 如果batch外还剩下一点句子,并且迭代到了最后一个batch
batches = self.batches[self.index * self.batch_size: len(self.batches)] # 直接拿出剩下的所有数据
self.index += 1
batches = self._to_tensor(batches)
return batches

elif self.index >= self.n_batches:
self.index = 0
raise StopIteration
else: # 迭代器的入口,刚开始self.index是0,肯定小于self.n_batches
batches = self.batches[self.index * self.batch_size: (self.index + 1) * self.batch_size] # 正常取一个batch的数据
self.index += 1
batches = self._to_tensor(batches) # 转化为tensor
return batches

def __iter__(self):
return self

def __len__(self):
if self.residue:
return self.n_batches + 1
else:
return self.n_batches

def build_iterator(dataset, config): # 这里的dataset是经过build_dataset()处理后得到的数据(vocab, train, dev, test)
iter = DatasetIterater(dataset, config.batch_size, config.device)
return iter

def get_time_dif(start_time):
"""获取已使用时间"""
end_time = time.time()
time_dif = end_time - start_time
return timedelta(seconds=int(round(time_dif)))

if __name__ == "__main__":
'''提取预训练词向量'''
vocab_dir = "./THUCNews/data/vocab.pkl"
pretrain_dir = "./THUCNews/data/sgns.sogou.char"
emb_dim = 300
filename_trimmed_dir = "./THUCNews/data/vocab.embedding.sougou"
word_to_id = pkl.load(open(vocab_dir, 'rb'))
embeddings = np.random.rand(len(word_to_id), emb_dim)
f = open(pretrain_dir, "r", encoding='UTF-8')
for i, line in enumerate(f.readlines()):
# if i == 0: # 若第一行是标题,则跳过
# continue
lin = line.strip().split(" ")
if lin[0] in word_to_id:
idx = word_to_id[lin[0]]
emb = [float(x) for x in lin[1:301]]
embeddings[idx] = np.asarray(emb, dtype='float32')
f.close()
np.savez_compressed(filename_trimmed_dir, embeddings=embeddings)

train_eval.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# coding: UTF-8
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from sklearn import metrics
import time
from utils import get_time_dif
from tensorboardX import SummaryWriter


# 权重初始化,默认xavier(如果不初始化,则默认的随机权重会特别大,对模型训练造成影响)
def init_network(model, method='xavier', exclude='embedding', seed=123):
for name, w in model.named_parameters(): # 迭代网络中所有可训练的参数
if exclude not in name: # 排除名字中包含指定关键词的参数(默认为'embedding')
if 'weight' in name: # 对权重进行初始化
if method == 'xavier':
nn.init.xavier_normal_(w) # 调用不同的初始化方法
elif method == 'kaiming':
nn.init.kaiming_normal_(w)
else:
nn.init.normal_(w)
elif 'bias' in name: # 对偏置进行初始化
nn.init.constant_(w, 0)
else: # 跳过除权重和偏置外的其他参数
pass


def train(config, model, train_iter, dev_iter, test_iter):
start_time = time.time()
model.train() # model.train()将启用BatchNormalization和Dropout,相应的,model.eval()则不启用BatchNormalization和Dropout
optimizer = torch.optim.Adam(model.parameters(), lr=config.learning_rate) # 指定优化方法

# 学习率指数衰减,每次epoch:学习率 = gamma * 学习率
# scheduler = torch.optim.lr_scheduler.ExponentialLR(optimizer, gamma=0.9)
total_batch = 0 # 记录进行到多少batch
dev_best_loss = float('inf')
last_improve = 0 # 记录上次验证集loss下降的batch数
flag = False # 记录是否很久没有效果提升
writer = SummaryWriter(log_dir=config.log_path + '/' + time.strftime('%m-%d_%H.%M', time.localtime()))
for epoch in range(config.num_epochs):
print('Epoch [{}/{}]'.format(epoch + 1, config.num_epochs))
# scheduler.step() # 学习率衰减
for i, (trains, labels) in enumerate(train_iter):
outputs = model(trains)
model.zero_grad()
loss = F.cross_entropy(outputs, labels)
loss.backward()
optimizer.step()
if total_batch % 100 == 0:
# 每多少轮输出在训练集和验证集上的效果
true = labels.data.cpu() # 从cpu tensor中取出标签数据
predic = torch.max(outputs.data, 1)[1].cpu() # 返回每一行中最大值的列索引
train_acc = metrics.accuracy_score(true, predic) # 计算这个batch的分类准确率
dev_acc, dev_loss = evaluate(config, model, dev_iter) # 计算开发集上的准确率和训练误差
if dev_loss < dev_best_loss: # 使用开发集判断模型性能是否提升
dev_best_loss = dev_loss
torch.save(model.state_dict(), config.save_path)
improve = '*'
last_improve = total_batch
else:
improve = ''
time_dif = get_time_dif(start_time)
msg = 'Iter: {0:>6}, Train Loss: {1:>5.2}, Train Acc: {2:>6.2%}, Val Loss: {3:>5.2}, Val Acc: {4:>6.2%}, Time: {5} {6}'
print(msg.format(total_batch, loss.item(), train_acc, dev_loss, dev_acc, time_dif, improve))
writer.add_scalar("loss/train", loss.item(), total_batch)
writer.add_scalar("loss/dev", dev_loss, total_batch)
writer.add_scalar("acc/train", train_acc, total_batch)
writer.add_scalar("acc/dev", dev_acc, total_batch)
model.train()
total_batch += 1
if total_batch - last_improve > config.require_improvement:
# 验证集loss超过1000batch没下降,结束训练
# 开发集loss超过一定数量的batch没下降,则结束训练
print("No optimization for a long time, auto-stopping...")
flag = True
break
if flag:
break
writer.close()
test(config, model, test_iter)


def test(config, model, test_iter):
# test
model.load_state_dict(torch.load(config.save_path))
model.eval()
start_time = time.time()
test_acc, test_loss, test_report, test_confusion = evaluate(config, model, test_iter, test=True)
msg = 'Test Loss: {0:>5.2}, Test Acc: {1:>6.2%}'
print(msg.format(test_loss, test_acc))
print("Precision, Recall and F1-Score...")
print(test_report)
print("Confusion Matrix...")
print(test_confusion)
time_dif = get_time_dif(start_time)
print("Time usage:", time_dif)


def evaluate(config, model, data_iter, test=False):
model.eval() # 不启用BatchNormalization和Dropout
loss_total = 0
predict_all = np.array([], dtype=int)
labels_all = np.array([], dtype=int)
with torch.no_grad(): # 不追踪梯度
for texts, labels in data_iter: # 对数据集中的每一组数据
outputs = model(texts) # 使用模型进行预测
loss = F.cross_entropy(outputs, labels) # 计算模型损失
loss_total += loss # 累加模型损失
labels = labels.data.cpu().numpy()
predic = torch.max(outputs.data, 1)[1].cpu().numpy()
labels_all = np.append(labels_all, labels) # 记录标签
predict_all = np.append(predict_all, predic) # 记录预测结果

acc = metrics.accuracy_score(labels_all, predict_all) # 计算分类误差
if test:
report = metrics.classification_report(labels_all, predict_all, target_names=config.class_list, digits=4)
confusion = metrics.confusion_matrix(labels_all, predict_all)
return acc, loss_total / len(data_iter), report, confusion
return acc, loss_total / len(data_iter) # 返回分类误差和平均模型损失

FastText模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# coding: UTF-8
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np

class Config(object):

"""配置参数"""
def __init__(self, dataset, embedding):
self.model_name = 'FastText'
self.train_path = dataset + '/data/train.txt' # 训练集
self.dev_path = dataset + '/data/dev.txt' # 验证集
self.test_path = dataset + '/data/test.txt' # 测试集
self.class_list = [x.strip() for x in open(
dataset + '/data/class.txt', encoding='utf-8').readlines()] # 类别名单
self.vocab_path = dataset + '/data/vocab.pkl' # 词表
self.save_path = dataset + '/saved_dict/' + self.model_name + '.ckpt' # 模型训练结果
self.log_path = dataset + '/log/' + self.model_name
self.embedding_pretrained = torch.tensor(
np.load(dataset + '/data/' + embedding)["embeddings"].astype('float32'))\
if embedding != 'random' else None # 预训练词向量
self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') # 设备

self.dropout = 0.5 # 随机失活
self.require_improvement = 1000 # 若超过1000batch效果还没提升,则提前结束训练
self.num_classes = len(self.class_list) # 类别数
self.n_vocab = 0 # 词表大小,在运行时赋值
self.num_epochs = 20 # epoch数
self.batch_size = 128 # mini-batch大小
self.pad_size = 32 # 每句话处理成的长度(短填长切)
self.learning_rate = 1e-3 # 学习率
self.embed = self.embedding_pretrained.size(1)\
if self.embedding_pretrained is not None else 300 # 字向量维度
self.hidden_size = 256 # 隐藏层大小
self.n_gram_vocab = 250499 # ngram 词表大小


'''Bag of Tricks for Efficient Text Classification'''

class Model(nn.Module):
def __init__(self, config):
super(Model, self).__init__()
if config.embedding_pretrained is not None:
self.embedding = nn.Embedding.from_pretrained(config.embedding_pretrained, freeze=False)
else:
self.embedding = nn.Embedding(config.n_vocab, config.embed, padding_idx=config.n_vocab - 1)
self.embedding_ngram2 = nn.Embedding(config.n_gram_vocab, config.embed)
self.embedding_ngram3 = nn.Embedding(config.n_gram_vocab, config.embed)
self.dropout = nn.Dropout(config.dropout)
self.fc1 = nn.Linear(config.embed * 3, config.hidden_size)
# self.dropout2 = nn.Dropout(config.dropout)
self.fc2 = nn.Linear(config.hidden_size, config.num_classes)

def forward(self, x):

out_word = self.embedding(x[0])
out_bigram = self.embedding_ngram2(x[2])
out_trigram = self.embedding_ngram3(x[3])
out = torch.cat((out_word, out_bigram, out_trigram), -1)

out = out.mean(dim=1)
out = self.dropout(out)
out = self.fc1(out)
out = F.relu(out)
out = self.fc2(out)
return out

runFastText.py主函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# coding: UTF-8
import time
import torch
import numpy as np
from train_eval import train, init_network
from importlib import import_module
from utils_fastTextTest import build_dataset, build_iterator, get_time_dif
# import argparse

# parser = argparse.ArgumentParser(description='Chinese Text Classification')
# parser.add_argument('--model', type=str, required=True, help='choose a model: TextCNN, TextRNN, FastText, TextRCNN, TextRNN_Att, DPCNN, Transformer')
# parser.add_argument('--embedding', default='pre_trained', type=str, help='random or pre_trained')
# parser.add_argument('--word', default=False, type=bool, help='True for word, False for char')
# args = parser.parse_args()


if __name__ == '__main__':
dataset = 'THUCNews' # 数据集

# 搜狗新闻:embedding_SougouNews.npz, 腾讯:embedding_Tencent.npz, 随机初始化:random
embedding = 'embedding_SougouNews.npz'
model_name = 'FastText' # 'TextRCNN' # TextCNN, TextRNN, FastText, TextRCNN, TextRNN_Att, DPCNN, Transformer

x = import_module('models.' + model_name)
config = x.Config(dataset, embedding) # 加载神经网络模型的参数
np.random.seed(1)
torch.manual_seed(1)
torch.cuda.manual_seed_all(1)
torch.backends.cudnn.deterministic = True # 固定随机因子,保证每次结果一样

start_time = time.time()
print("Loading data...")
vocab, train_data, dev_data, test_data = build_dataset(config, False)
train_iter = build_iterator(train_data, config)
dev_iter = build_iterator(dev_data, config)
test_iter = build_iterator(test_data, config)
time_dif = get_time_dif(start_time)
print("Time usage:", time_dif)

# train
config.n_vocab = len(vocab)
model = x.Model(config).to(config.device)
if model_name != 'Transformer':
init_network(model)
print(model.parameters)
train(config, model, train_iter, dev_iter, test_iter)

训练结果

因为无GPU,代码中添加了Ngram图,训练较慢。截图仅有两个epoch。
参考结果:不加N-Gram信息,就是词袋模型,准确率89.59%,加上2-gram和3-gram后准确率92.23%。
在这里插入图片描述

[video(video-TQauKbQ1-1591957225364)(type-edu_course)(url-https://edu.csdn.net/course/blogPlay?goods_id=17131&blog_creator=weixin_43433969&marketing_id=141)(image-https://img-bss.csdnimg.cn/20191229134635364.jpg)(title-PyTorch从入门到实战一次学会)]

参考文献

1.中文文本分类 pytorch实现
2.中文文本分类代码分析
3.FastText中词序的处理