Skip to content

Commit ebbf32c

Browse files
author
unknown
committed
add hmm
1 parent e766a49 commit ebbf32c

File tree

1 file changed

+238
-0
lines changed

1 file changed

+238
-0
lines changed

hmm/hmm.py

Lines changed: 238 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,238 @@
1+
# encoding=utf8
2+
3+
import numpy as np
4+
import csv
5+
6+
class HMM(object):
7+
def __init__(self,N,M):
8+
self.A = np.zeros((N,N)) # 状态转移概率矩阵
9+
self.B = np.zeros((N,M)) # 观测概率矩阵
10+
self.Pi = np.array([1.0/N]*N) # 初始状态概率矩阵
11+
12+
self.N = N # 可能的状态数
13+
self.M = M # 可能的观测数
14+
15+
def cal_probality(self, O):
16+
self.T = len(O)
17+
self.O = O
18+
19+
self.forward()
20+
return sum(self.alpha[self.T-1])
21+
22+
def forward(self):
23+
"""
24+
前向算法
25+
"""
26+
self.alpha = np.zeros((self.T,self.N))
27+
28+
# 公式 10.15
29+
for i in range(self.N):
30+
self.alpha[0][i] = self.Pi[i]*self.B[i][self.O[0]]
31+
32+
# 公式10.16
33+
for t in range(1,self.T):
34+
for i in range(self.N):
35+
sum = 0
36+
for j in range(self.N):
37+
sum += self.alpha[t-1][j]*self.A[j][i]
38+
self.alpha[t][i] = sum * self.B[i][self.O[t]]
39+
40+
def backward(self):
41+
"""
42+
后向算法
43+
"""
44+
self.beta = np.zeros((self.T,self.N))
45+
46+
# 公式10.19
47+
for i in range(self.N):
48+
self.beta[self.T-1][i] = 1
49+
50+
# 公式10.20
51+
for t in range(self.T-2,-1,-1):
52+
for i in range(self.N):
53+
for j in range(self.N):
54+
self.beta[t][i] += self.A[i][j]*self.B[j][self.O[t+1]]*self.beta[t+1][j]
55+
56+
def cal_gamma(self, i, t):
57+
"""
58+
公式 10.24
59+
"""
60+
numerator = self.alpha[t][i]*self.beta[t][i]
61+
denominator = 0
62+
63+
for j in range(self.N):
64+
denominator += self.alpha[t][j]*self.beta[t][j]
65+
66+
return numerator/denominator
67+
68+
def cal_ksi(self, i, j, t):
69+
"""
70+
公式 10.26
71+
"""
72+
73+
numerator = self.alpha[t][i]*self.A[i][j]*self.B[j][self.O[t+1]]*self.beta[t+1][j]
74+
denominator = 0
75+
76+
for i in range(self.N):
77+
for j in range(self.N):
78+
denominator += self.alpha[t][i]*self.A[i][j]*self.B[j][self.O[t+1]]*self.beta[t+1][j]
79+
80+
return numerator/denominator
81+
82+
def init(self):
83+
"""
84+
随机生成 A,B,Pi
85+
并保证每行相加等于 1
86+
"""
87+
import random
88+
for i in range(self.N):
89+
randomlist = [random.randint(0,100) for t in range(self.N)]
90+
Sum = sum(randomlist)
91+
for j in range(self.N):
92+
self.A[i][j] = randomlist[j]/Sum
93+
94+
for i in range(self.N):
95+
randomlist = [random.randint(0,100) for t in range(self.M)]
96+
Sum = sum(randomlist)
97+
for j in range(self.M):
98+
self.B[i][j] = randomlist[j]/Sum
99+
100+
def train(self, O, MaxSteps = 100):
101+
self.T = len(O)
102+
self.O = O
103+
104+
# 初始化
105+
self.init()
106+
107+
step = 0
108+
# 递推
109+
while step<MaxSteps:
110+
step+=1
111+
print(step)
112+
tmp_A = np.zeros((self.N,self.N))
113+
tmp_B = np.zeros((self.N,self.M))
114+
tmp_pi = np.array([0.0]*self.N)
115+
116+
self.forward()
117+
self.backward()
118+
119+
# a_{ij}
120+
for i in range(self.N):
121+
for j in range(self.N):
122+
numerator=0.0
123+
denominator=0.0
124+
for t in range(self.T-1):
125+
numerator += self.cal_ksi(i,j,t)
126+
denominator += self.cal_gamma(i,t)
127+
tmp_A[i][j] = numerator/denominator
128+
129+
# b_{jk}
130+
for j in range(self.N):
131+
for k in range(self.M):
132+
numerator = 0.0
133+
denominator = 0.0
134+
for t in range(self.T):
135+
if k == self.O[t]:
136+
numerator += self.cal_gamma(j,t)
137+
denominator += self.cal_gamma(j,t)
138+
tmp_B[j][k] = numerator / denominator
139+
140+
# pi_i
141+
for i in range(self.N):
142+
tmp_pi[i] = self.cal_gamma(i,0)
143+
144+
self.A = tmp_A
145+
self.B = tmp_B
146+
self.Pi = tmp_pi
147+
148+
def generate(self, length):
149+
import random
150+
I = []
151+
152+
# start
153+
ran = random.randint(0,1000)/1000.0
154+
i = 0
155+
while self.Pi[i]<ran or self.Pi[i]<0.0001:
156+
ran -= self.Pi[i]
157+
i += 1
158+
I.append(i)
159+
160+
# 生成状态序列
161+
for i in range(1,length):
162+
last = I[-1]
163+
ran = random.randint(0, 1000) / 1000.0
164+
i = 0
165+
while self.A[last][i] < ran or self.A[last][i]<0.0001:
166+
ran -= self.A[last][i]
167+
i += 1
168+
I.append(i)
169+
170+
# 生成观测序列
171+
Y = []
172+
for i in range(length):
173+
k = 0
174+
ran = random.randint(0, 1000) / 1000.0
175+
while self.B[I[i]][k] < ran or self.B[I[i]][k]<0.0001:
176+
ran -= self.B[I[i]][k]
177+
k += 1
178+
Y.append(k)
179+
180+
return Y
181+
182+
183+
184+
def triangle(length):
185+
'''
186+
三角波
187+
'''
188+
X = [i for i in range(length)]
189+
Y = []
190+
191+
for x in X:
192+
x = x % 6
193+
if x <= 3:
194+
Y.append(x)
195+
else:
196+
Y.append(6-x)
197+
return X,Y
198+
199+
def sin(length):
200+
'''
201+
三角波
202+
'''
203+
import math
204+
X = [i for i in range(length)]
205+
Y = []
206+
207+
for x in X:
208+
x = x % 20
209+
Y.append(int(math.sin((x*math.pi)/10)*50)+50)
210+
return X,Y
211+
212+
213+
214+
def show_data(x,y):
215+
import matplotlib.pyplot as plt
216+
plt.plot(x, y, 'g')
217+
plt.show()
218+
219+
return y
220+
221+
222+
if __name__ == '__main__':
223+
hmm = HMM(10,4)
224+
tri_x, tri_y = triangle(20)
225+
226+
hmm.train(tri_y)
227+
y = hmm.generate(100)
228+
x = [i for i in range(100)]
229+
show_data(x,y)
230+
231+
# hmm = HMM(15,101)
232+
# sin_x, sin_y = sin(40)
233+
# show_data(sin_x, sin_y)
234+
# hmm.train(sin_y)
235+
# y = hmm.generate(100)
236+
# x = [i for i in range(100)]
237+
# show_data(x,y)
238+

0 commit comments

Comments
 (0)