一枚NLPer小菜鸡

pytorch版本的bilstm+crf的实现

CRF 代码实现

最近没啥事情,想把CRF相关的内容再捋一捋,因此研究了Pytoch的CRF实现,代码如下展示,之后将会研究如何做batch版本的CRF。

关于CRF的实现,表面上和HMM模型基本一致,从我的角度来看,因为其观测矩阵的实现由模型给出,即 $P(Y|X)$,因此其展示的是判别模型。所以才认为其是CRF模型,
如果变成隐马尔可夫模型,需要建模$P(Y,X)$, 十分有趣,不同观测矩阵的给出方式决定了模型的类型。

关于代码实现部分,其关键点在于 前向概率的计算和解码的维特比算法部分。关于下文中借助了 Bi-LSTM作为CRF模型的观测矩阵的来源必不可少,这部分模型也可以由其它模型替换,比如 Transformer类型的模型。

对于该部分代码的讲解,我在知乎上看到一个比较好的回答,如果大家对这部分代码不是很熟悉,建议先去《统计学习方法》这本书上看一下具体的理论介绍,然后可以参考PyTorch Bi-LSTM+CRF NER标注代码精读这篇文章,通过理论和代码实现上的学习,我相信大家可以对该部分内容有一个较好的理解。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244

import torch
import torch.autograd as autograd
import torch.nn as nn
import torch.optim as optim

torch.manual_seed(6)

START_TAG = "<START>"
STOP_TAG = "<STOP>"

def argmax(vec):
_ , idx = torch.max(vec,1)
return idx.item()

def prepare_sequence(seq,to_ix):
idxs = [ to_ix[w] for w in seq ]
return torch.tensor(idxs,dtype= torch.long)


# 减去最大值,防止在exp的时候溢出
def log_sum_exp(vec):
max_score = vec[0,argmax(vec)]
max_score_broadcast = max_score.view(1,-1).expand(1,vec.size()[1])

return max_score + torch.log(torch.sum(torch.exp(vec - max_score_broadcast)))

class BiLSTM_CRF(nn.Module):

def __init__(self,vocab_size,tag_to_idx,embedding_dim,hidden_dim):
super(BiLSTM_CRF,self).__init__()

self.embedding_dim = embedding_dim
self.hidden_dim = hidden_dim
self.vocab_size = vocab_size
self.tag_to_ix = tag_to_idx
self.tagset_size = len(tag_to_idx)

self.word_embeds = nn.Embedding(vocab_size, embedding_dim)
self.lstm = nn.LSTM(embedding_dim, hidden_dim//2,num_layers=1,bidirectional=True)


#隐藏层与tag层的映射维度变换
self.hidden2tag = nn.Linear(hidden_dim,self.tagset_size)

#状态转移矩阵 元素i,j的值代表从j-->i 的状态转移概率
self.trasitions = nn.Parameter(
torch.randn(self.tagset_size,self.tagset_size)
)

# 状态转移加入条件,任何状态无法转移到起始状态
# 且 任何状态无法从停止状态转移而来
self.trasitions.data[tag_to_idx[START_TAG],:] = -10000
self.trasitions.data[:,tag_to_idx[STOP_TAG]] = -10000

self.hidden = self.init_hidden()

# 对lstm网络层初始的ceil,hidden状态进行预设置
def init_hidden(self):
return (torch.randn(2,1,self.hidden_dim //2),
torch.randn(2,1,self.hidden_dim//2))


def _forward_alg(self,feats):

init_alphas = torch.full((1,self.tagset_size),-10000.)

init_alphas[0][self.tag_to_ix[START_TAG]] = 0.

forward_var = init_alphas

# 遍历句子中的每一个词
for feat in feats:

alphas_t = [] # 获取当前词对应的tag概率分布

for next_tag in range(self.tagset_size): #遍历所有的tag
# 广播发射矩阵(观测矩阵)

# 遍历得到当前的标签

emit_score = feat[next_tag].view(1,-1).expand(1,self.tagset_size)

# 转移概率的值为 第 i 个 条目从 i 过渡到 next_tag 的分数
#

trans_score = self.trasitions[next_tag].view(1,-1)

next_tag_var = forward_var + trans_score + emit_score

alphas_t.append(log_sum_exp(next_tag_var).view(1))

forward_var = torch.cat(alphas_t).view(1,-1) # 将上一阶段的tag概率分布作为下一个阶段的初始状态概率分布

terminal_var = forward_var + self.trasitions[self.tag_to_ix[STOP_TAG]]

alpha = log_sum_exp(terminal_var)

return alpha

def _get_lstm_features(self,sentence):
self.hidden = self.init_hidden()

embeds = self.word_embeds(sentence).view(len(sentence),1,-1)

lstm_out, self.hidden = self.lstm(embeds, self.hidden)

lstm_out = lstm_out.view(len(sentence),self.hidden_dim)

lstm_feats = self.hidden2tag(lstm_out)

return lstm_feats

def _score_sentence(self,feats,tags):

# 计算 output feature 和tags之间的误差

score = torch.zeros(1)

tags = torch.cat([torch.tensor([self.tag_to_ix[START_TAG]],dtype=torch.long),tags])

# score 的计算方式为 概率转移矩阵的对应tag之间的转移概率加上,观测矩阵对应位置x-y的概率值
for i,feat in enumerate(feats):
score = score + \
self.trasitions[tags[i+1],tags[i]] +feat[tags[i+1]]
score = score + self.trasitions[self.tag_to_ix[STOP_TAG],tags[-1]]

return score


def _viterbi_decode(self,feats):
backpointers = []

# 初始 状态 变量
init_vvars = torch.full((1,self.tagset_size), -10000.)
init_vvars[0][self.tag_to_ix[START_TAG]] = 0

forward_var = init_vvars
for feat in feats:
bptrs_t = [] # 保存当前节点的上一时间的节点转移概率最大值的节点

viterbivars_t = [] # 保存当前的时间节点的概率分布

for next_tag in range(self.tagset_size):
# next_tag_var 保存了 当前时刻维特比变量转移到
# 下一个时间节点tag的概率分布

# 这里没有考虑到观测概率矩阵的概率原因在于 最大值不依赖于观测矩阵
next_tag_var = forward_var + self.trasitions[next_tag]

best_tag_id = argmax(next_tag_var)
bptrs_t.append(best_tag_id)
viterbivars_t.append(next_tag_var[0][best_tag_id].view(1))

#
forward_var = (torch.cat(viterbivars_t) + feat ).view(1,-1)
backpointers.append(bptrs_t)

# 转移至 STOP_TAG
ternminal_var = forward_var + self.trasitions[self.tag_to_ix[STOP_TAG]]
best_tag_id = argmax(ternminal_var)
path_score = ternminal_var[0][best_tag_id]

best_path = [best_tag_id]
for bptrs_t in reversed(backpointers):
best_tag_id = bptrs_t[best_tag_id]
best_path.append(best_tag_id)

# 将 START tag 从得到的状态序列中移除
start= best_path.pop()
assert start == self.tag_to_ix[START_TAG] #检查正确性
best_path.reverse()
return path_score,best_path

def neg_log_liklihood(self,sentence,tags):

# 获得观测概率矩阵
feats = self._get_lstm_features(sentence)

forward_score = self._forward_alg(feats)

gold_score = self._score_sentence(feats,tags)
return forward_score - gold_score

def forward(self,sentence):
lstm_feats = self._get_lstm_features(sentence)

score,tag_seq = self._viterbi_decode(lstm_feats)

return score ,tag_seq


EMBEDDING_DIM = 5
HIDDEN_DIM = 4

training_data = [(
"the wall street journal reported today that apple corporation made money".split(),
"B I I I O O O B I O O".split()
), (
"georgia tech is a university in georgia".split(),
"B I O O O O B".split()
)]

word_to_ix = {}
for sentence, tags in training_data:
for word in sentence:
if word not in word_to_ix:
word_to_ix[word] = len(word_to_ix)

tag_to_ix = {"B":0,"I":1,"O":2,START_TAG:3,STOP_TAG:4}

model = BiLSTM_CRF(len(word_to_ix),tag_to_ix,EMBEDDING_DIM,HIDDEN_DIM)

optimizer = optim.Adam(model.parameters(),lr = 0.01, weight_decay= 1e-4)

with torch.no_grad():
precheck_sent = prepare_sequence(training_data[0][0],word_to_ix)
precheck_tags = torch.tensor([tag_to_ix[w] for w in training_data[0][1]],dtype=torch.long)

print(model(precheck_sent))
losses = []
for epoch in range(300):
for sentence,tags in training_data:

model.zero_grad()

sentence = prepare_sequence(sentence,word_to_ix)

targets = torch.tensor([tag_to_ix[t] for t in tags], dtype = torch.long)

loss = model.neg_log_liklihood(sentence,targets)
losses.append(loss.item())
loss.backward()

optimizer.step()

with torch.no_grad():
precheck_sent = prepare_sequence(training_data[0][0],word_to_ix)
precheck_tags = torch.tensor([tag_to_ix[w] for w in training_data[0][1]],dtype=torch.long)

print(model(precheck_sent))


print(losses)
O(∩_∩)O哈哈~