|
| 1 | +# encoding=utf8 |
| 2 | + |
| 3 | +import numpy as np |
| 4 | +import csv |
| 5 | + |
| 6 | +class HMM(object): |
| 7 | + def __init__(self,N,M): |
| 8 | + self.A = np.zeros((N,N)) # 状态转移概率矩阵 |
| 9 | + self.B = np.zeros((N,M)) # 观测概率矩阵 |
| 10 | + self.Pi = np.array([1.0/N]*N) # 初始状态概率矩阵 |
| 11 | + |
| 12 | + self.N = N # 可能的状态数 |
| 13 | + self.M = M # 可能的观测数 |
| 14 | + |
| 15 | + def cal_probality(self, O): |
| 16 | + self.T = len(O) |
| 17 | + self.O = O |
| 18 | + |
| 19 | + self.forward() |
| 20 | + return sum(self.alpha[self.T-1]) |
| 21 | + |
| 22 | + def forward(self): |
| 23 | + """ |
| 24 | + 前向算法 |
| 25 | + """ |
| 26 | + self.alpha = np.zeros((self.T,self.N)) |
| 27 | + |
| 28 | + # 公式 10.15 |
| 29 | + for i in range(self.N): |
| 30 | + self.alpha[0][i] = self.Pi[i]*self.B[i][self.O[0]] |
| 31 | + |
| 32 | + # 公式10.16 |
| 33 | + for t in range(1,self.T): |
| 34 | + for i in range(self.N): |
| 35 | + sum = 0 |
| 36 | + for j in range(self.N): |
| 37 | + sum += self.alpha[t-1][j]*self.A[j][i] |
| 38 | + self.alpha[t][i] = sum * self.B[i][self.O[t]] |
| 39 | + |
| 40 | + def backward(self): |
| 41 | + """ |
| 42 | + 后向算法 |
| 43 | + """ |
| 44 | + self.beta = np.zeros((self.T,self.N)) |
| 45 | + |
| 46 | + # 公式10.19 |
| 47 | + for i in range(self.N): |
| 48 | + self.beta[self.T-1][i] = 1 |
| 49 | + |
| 50 | + # 公式10.20 |
| 51 | + for t in range(self.T-2,-1,-1): |
| 52 | + for i in range(self.N): |
| 53 | + for j in range(self.N): |
| 54 | + self.beta[t][i] += self.A[i][j]*self.B[j][self.O[t+1]]*self.beta[t+1][j] |
| 55 | + |
| 56 | + def cal_gamma(self, i, t): |
| 57 | + """ |
| 58 | + 公式 10.24 |
| 59 | + """ |
| 60 | + numerator = self.alpha[t][i]*self.beta[t][i] |
| 61 | + denominator = 0 |
| 62 | + |
| 63 | + for j in range(self.N): |
| 64 | + denominator += self.alpha[t][j]*self.beta[t][j] |
| 65 | + |
| 66 | + return numerator/denominator |
| 67 | + |
| 68 | + def cal_ksi(self, i, j, t): |
| 69 | + """ |
| 70 | + 公式 10.26 |
| 71 | + """ |
| 72 | + |
| 73 | + numerator = self.alpha[t][i]*self.A[i][j]*self.B[j][self.O[t+1]]*self.beta[t+1][j] |
| 74 | + denominator = 0 |
| 75 | + |
| 76 | + for i in range(self.N): |
| 77 | + for j in range(self.N): |
| 78 | + denominator += self.alpha[t][i]*self.A[i][j]*self.B[j][self.O[t+1]]*self.beta[t+1][j] |
| 79 | + |
| 80 | + return numerator/denominator |
| 81 | + |
| 82 | + def init(self): |
| 83 | + """ |
| 84 | + 随机生成 A,B,Pi |
| 85 | + 并保证每行相加等于 1 |
| 86 | + """ |
| 87 | + import random |
| 88 | + for i in range(self.N): |
| 89 | + randomlist = [random.randint(0,100) for t in range(self.N)] |
| 90 | + Sum = sum(randomlist) |
| 91 | + for j in range(self.N): |
| 92 | + self.A[i][j] = randomlist[j]/Sum |
| 93 | + |
| 94 | + for i in range(self.N): |
| 95 | + randomlist = [random.randint(0,100) for t in range(self.M)] |
| 96 | + Sum = sum(randomlist) |
| 97 | + for j in range(self.M): |
| 98 | + self.B[i][j] = randomlist[j]/Sum |
| 99 | + |
| 100 | + def train(self, O, MaxSteps = 100): |
| 101 | + self.T = len(O) |
| 102 | + self.O = O |
| 103 | + |
| 104 | + # 初始化 |
| 105 | + self.init() |
| 106 | + |
| 107 | + step = 0 |
| 108 | + # 递推 |
| 109 | + while step<MaxSteps: |
| 110 | + step+=1 |
| 111 | + print(step) |
| 112 | + tmp_A = np.zeros((self.N,self.N)) |
| 113 | + tmp_B = np.zeros((self.N,self.M)) |
| 114 | + tmp_pi = np.array([0.0]*self.N) |
| 115 | + |
| 116 | + self.forward() |
| 117 | + self.backward() |
| 118 | + |
| 119 | + # a_{ij} |
| 120 | + for i in range(self.N): |
| 121 | + for j in range(self.N): |
| 122 | + numerator=0.0 |
| 123 | + denominator=0.0 |
| 124 | + for t in range(self.T-1): |
| 125 | + numerator += self.cal_ksi(i,j,t) |
| 126 | + denominator += self.cal_gamma(i,t) |
| 127 | + tmp_A[i][j] = numerator/denominator |
| 128 | + |
| 129 | + # b_{jk} |
| 130 | + for j in range(self.N): |
| 131 | + for k in range(self.M): |
| 132 | + numerator = 0.0 |
| 133 | + denominator = 0.0 |
| 134 | + for t in range(self.T): |
| 135 | + if k == self.O[t]: |
| 136 | + numerator += self.cal_gamma(j,t) |
| 137 | + denominator += self.cal_gamma(j,t) |
| 138 | + tmp_B[j][k] = numerator / denominator |
| 139 | + |
| 140 | + # pi_i |
| 141 | + for i in range(self.N): |
| 142 | + tmp_pi[i] = self.cal_gamma(i,0) |
| 143 | + |
| 144 | + self.A = tmp_A |
| 145 | + self.B = tmp_B |
| 146 | + self.Pi = tmp_pi |
| 147 | + |
| 148 | + def generate(self, length): |
| 149 | + import random |
| 150 | + I = [] |
| 151 | + |
| 152 | + # start |
| 153 | + ran = random.randint(0,1000)/1000.0 |
| 154 | + i = 0 |
| 155 | + while self.Pi[i]<ran or self.Pi[i]<0.0001: |
| 156 | + ran -= self.Pi[i] |
| 157 | + i += 1 |
| 158 | + I.append(i) |
| 159 | + |
| 160 | + # 生成状态序列 |
| 161 | + for i in range(1,length): |
| 162 | + last = I[-1] |
| 163 | + ran = random.randint(0, 1000) / 1000.0 |
| 164 | + i = 0 |
| 165 | + while self.A[last][i] < ran or self.A[last][i]<0.0001: |
| 166 | + ran -= self.A[last][i] |
| 167 | + i += 1 |
| 168 | + I.append(i) |
| 169 | + |
| 170 | + # 生成观测序列 |
| 171 | + Y = [] |
| 172 | + for i in range(length): |
| 173 | + k = 0 |
| 174 | + ran = random.randint(0, 1000) / 1000.0 |
| 175 | + while self.B[I[i]][k] < ran or self.B[I[i]][k]<0.0001: |
| 176 | + ran -= self.B[I[i]][k] |
| 177 | + k += 1 |
| 178 | + Y.append(k) |
| 179 | + |
| 180 | + return Y |
| 181 | + |
| 182 | + |
| 183 | + |
| 184 | +def triangle(length): |
| 185 | + ''' |
| 186 | + 三角波 |
| 187 | + ''' |
| 188 | + X = [i for i in range(length)] |
| 189 | + Y = [] |
| 190 | + |
| 191 | + for x in X: |
| 192 | + x = x % 6 |
| 193 | + if x <= 3: |
| 194 | + Y.append(x) |
| 195 | + else: |
| 196 | + Y.append(6-x) |
| 197 | + return X,Y |
| 198 | + |
| 199 | +def sin(length): |
| 200 | + ''' |
| 201 | + 三角波 |
| 202 | + ''' |
| 203 | + import math |
| 204 | + X = [i for i in range(length)] |
| 205 | + Y = [] |
| 206 | + |
| 207 | + for x in X: |
| 208 | + x = x % 20 |
| 209 | + Y.append(int(math.sin((x*math.pi)/10)*50)+50) |
| 210 | + return X,Y |
| 211 | + |
| 212 | + |
| 213 | + |
| 214 | +def show_data(x,y): |
| 215 | + import matplotlib.pyplot as plt |
| 216 | + plt.plot(x, y, 'g') |
| 217 | + plt.show() |
| 218 | + |
| 219 | + return y |
| 220 | + |
| 221 | + |
| 222 | +if __name__ == '__main__': |
| 223 | + hmm = HMM(10,4) |
| 224 | + tri_x, tri_y = triangle(20) |
| 225 | + |
| 226 | + hmm.train(tri_y) |
| 227 | + y = hmm.generate(100) |
| 228 | + x = [i for i in range(100)] |
| 229 | + show_data(x,y) |
| 230 | + |
| 231 | + # hmm = HMM(15,101) |
| 232 | + # sin_x, sin_y = sin(40) |
| 233 | + # show_data(sin_x, sin_y) |
| 234 | + # hmm.train(sin_y) |
| 235 | + # y = hmm.generate(100) |
| 236 | + # x = [i for i in range(100)] |
| 237 | + # show_data(x,y) |
| 238 | + |
0 commit comments