隐马尔可夫分词

2,005 阅读5分钟

前言

虽然目前 nlp 很多任务已经发展到了使用深度学习的循环神经网络模型和注意力模型,但传统的模型咱们也一样要了解。这里看下如何使用隐马尔科夫模型(HMM)进行分词。

隐马尔科夫模型

隐马尔科夫模型是一种有向图模型,图模型能清晰表达变量相关关系的概率,常见的图模型还有条件随机场,节点表示变量,节点之间的连线表示两者相关概率。用以下定义来描述HMM模型,

设系统所有可能的状态集合为S = \{s_1,s_2,...s_n\},所有能观察的对象的集合V=\{v_1,v_2,...v_m\},那么就一共有n种隐状态和m种显状态。

再设总共T个时刻的状态序列E = (e_1,e_2,...e_T),对应有T个时刻的观察序列O = (o_1,o_2,...o_T),这两个很容易理解,对应到nlp的词性标注中就是一句话和这句话的词性标注,比如“我/是/中国/人”和“代词/动词/名词/名词”。

隐马尔科夫模型主要有三组概率:转移概率、观测概率和初始状态概率。

系统状态之间存在着转移概率,设一个转移矩阵A = [a_{ij}]_{n \times n},因为有n中隐状态,所以是n行n列。概率的计算公式为,

a_{ij} = P(e_{t+1} = s_j | e_t =s_i) ,\quad  1 \le i,j \le n

此表示任意时刻t的状态若为s_i,则下一时刻状态为s_j的概率,即任意时刻两种状态的转移概率了。

观测概率矩阵为B = [b_{ij}]_{n \times m},其中b_{ij} = P(o_t=v_j|e_t=s_i), \quad 1 \le i\le n,1 \le j\le m,它用于表示在任意时刻t,若状态为s_i,则生成观察状态v_k的概率。

除此之外还有一个初始状态概率为\pi =(\pi_1,\pi_2,...,\pi_n),用于表示初始时刻各状态出现的概率,其中\pi_i=P(e_1=s_i),\quad i=1,2,...,n,即t=1时刻状态为s_i的概率。

综上所述,一个隐马尔科夫模型可以用\lambda = [A,B,\pi]描述。

image

序列标签

给训练数据打上标签,可以指定四类标签:BMES。其中 B 代表词的开始,M 代表词的中间,E 代表词的结尾,S代表单字词。比如下面

农B业E生B产E再B次E获B得E好S的S收B成E

完成标签后即得到了观察序列和状态序列,隐马尔科夫模型的五要素得到了两个,通过这两个要素可以将转移概率矩阵、观察概率矩阵和初始状态概率计算出来。

实现代码

先定义训练文件读取函数,这里字与标签之间以\t分割,读取过程顺便统计观察对象集合和状态集合。

def read_data(filename):
    sentences = []
    sentence = []
    with open(filename, 'r', encoding='utf-8') as f:
        for line in f.readlines():
            word_label = line.strip().split('\t')
            if len(word_label) == 2:
                observation_set.add(word_label[0])
                state_set.add(word_label[1])
                sentence.append(word_label)
            else:
                sentences.append(sentence)
                sentence = []
    return sentences

定义训练函数,读取训练语料后遍历所有句子,统计观察概率矩阵observation_matrix,统计初始状态概率pi_state,统计转移概率矩阵transition_matrix,这里我们先统计个数,后面对其进行归一化后得到真正的概率。所以可以看到后面分别对转移概率矩阵、观察概率矩阵和初始状态做归一化处理。

def train():
    print('begin training......')
    sentences = read_data(data_path)
    for sentence in sentences:
        pre_label = -1
        for word, label in sentence:
            observation_matrix[label][word] = observation_matrix.setdefault(label, {}).setdefault(word, 0) + 1
            if pre_label == -1:
                pi_state[label] = pi_state.setdefault(label, 0) + 1
            else:
                transition_matrix[pre_label][label] = transition_matrix.setdefault(pre_label, {}).setdefault(label,
                                                                                                             0) + 1
            pre_label = label

    for key, value in transition_matrix.items():
        number_total = 0
        for k, v in value.items():
            number_total += v
        for k, v in value.items():
            transition_matrix[key][k] = 1.0 * v / number_total
    for key, value in observation_matrix.items():
        number_total = 0
        for k, v in value.items():
            number_total += v
        for k, v in value.items():
            observation_matrix[key][k] = 1.0 * v / number_total

    number_total = sum(pi_state.values())
    for k, v in pi_state.items():
        pi_state[k] = 1.0 * v / number_total

    print('finish training.....')
    save_model()

训练后将模型参数保存到文件中,预测时加载指定文件的模型。

def load_model():
    print('loading model...')
    with open(model_path, 'rb') as f:
        model = pickle.load(f)
    return model

def save_model():
    print('saving model...')
    model = [transition_matrix, observation_matrix, pi_state, state_set, observation_set]
    with open(model_path, 'wb') as f:
        pickle.dump(model, f)

训练完成后得到模型参数,接着定义预测函数,我们预测其实就是 HMM 的解码问题,一般可以使用 Viterbi 算法,详细可以参考前面写的文章《隐马尔可夫模型的Viterbi解码算法》。

随着时刻推进,每个节点都保存了前一时刻所有节点到该节点的最优值的子路径,最优值通过上一时刻节点上的累乘概率乘以当前时刻概率来判断的。当前时刻的某一节点可能的路径为上一时刻所有节点到该节点的路径,但我们只保留其中一条最优路径,。依次计算完所有步后,最后通过回溯的方法得到整个过程的最优路径。

def predict():
    text = '我在图书馆看书'
    min_probability = -1 * float('inf')
    words = [{} for _ in text]
    path = {}
    for state in state_set:
        words[0][state] = 1.0 * pi_state.get(state, default_probability) * observation_matrix.get(state, {}).get(
            text[0],
            default_probability)
        path[state] = [state]
    for t in range(1, len(text)):
        new_path = {}
        for state in state_set:
            max_probability = min_probability
            max_state = ''
            for pre_state in state_set:
                probability = words[t - 1][pre_state] * transition_matrix.get(pre_state, {}).get(state,
                                                                                                 default_probability) \
                              * observation_matrix.get(state, {}).get(text[t], default_probability)
                max_probability, max_state = max((max_probability, max_state), (probability, pre_state))
            words[t][state] = max_probability
            tmp = copy.deepcopy(path[max_state])
            tmp.append(state)
            new_path[state] = tmp
        path = new_path
    max_probability, max_state = max((words[len(text) - 1][s], s) for s in state_set)
    result = []
    p = re.compile('BM*E|S')
    for i in p.finditer(''.join(path[max_state])):
        start, end = i.span()
        word = text[start:end]
        result.append(word)
    print(result)

大致描述下解码过程,初始状态时,对于四种状态,“我”最大概率都是 S;接着分别计算“我”的四种状态到“在”的四种状态的概率,此时最大的也都为 S;继续分别计算“在”的四种状态到“图”的四种状态的概率,最大的都为 B;直到分别计算“馆”的四种状态到“看”的四种状态的概率时,最大概率不同了;

'M': ['S']
'B': ['S']
'E': ['S']
'S': ['S']
......
'M': ['S', 'S', 'B', 'M', 'E', 'B', 'M']
'B': ['S', 'S', 'B', 'M', 'E', 'S', 'B']
'E': ['S', 'S', 'B', 'M', 'E', 'B', 'E']
'S': ['S', 'S', 'B', 'M', 'E', 'S', 'S']

github

github.com/sea-boat/nl…

-------------推荐阅读------------

我的开源项目汇总(机器&深度学习、NLP、网络IO、AIML、mysql协议、chatbot)

为什么写《Tomcat内核设计剖析》

我的2017文章汇总——机器学习篇

我的2017文章汇总——Java及中间件

我的2017文章汇总——深度学习篇

我的2017文章汇总——JDK源码篇

我的2017文章汇总——自然语言处理篇

我的2017文章汇总——Java并发篇


跟我交流,向我提问:

欢迎关注: