阅读 587

用TensorFlow和Keras实现文本生成器 from TF Dev Summit '20

前言

发现又到要交作业的时候,本来打算写微前端的文章,但是评估过内容深度(chang du)和落地能力(mei xie hao demo)之后,转而先写深度学习相关的文章。目前 NLP 正处于寒武纪爆发阶段,我们有足够的数据和足够的工具,本文将讨论如何用 TensorFlow 2.0 实现文本生成器的模型。

首先安装最新版的TF和Keras

!pip install tf-nightly
!pip install tensorflow-addons
!pip install keras-tuner
复制代码

获取训练数据

数据集来源

本文将使用 Facebook 发布的儿童图书测试语料库,该数据集是由一系列儿童读物的段落和填空题、段落问题构成的。这个模型,我们只需要使用到原始的书本文本,就跟读书一样。

读取数据集

  • 首先我们需要加载数据集
  • 数据集的地址是: http://www.thespermwhale.com/jaseweston/babi/CBTest.tgz
  • 下载好数据之后,当然就是加载数据打印几行内容出来康康
import tensorflow as tf
lines = tf.data.TextLineDataset('./CBTest/data/cbt_train.txt')
for line in lines.take(3):
 print(line)
复制代码

结果

tf.Tensor(b'_BOOK_TITLE_ : Andrew_Lang___Prince_Prigio.txt.out', shape=(), dtype=string)
tf.Tensor(b'CHAPTER I. -LCB- Chapter heading picture : p1.jpg -RCB- How the Fairies were not Invited to Court .', shape=(), dtype=string)
tf.Tensor(b'Once upon a time there reigned in Pantouflia a king and a queen .', shape=(), dtype=string)
复制代码

数据清洗

数据中的文章标题和标点符号是我们不需要的,所以需要去除。

  • 使用数据集就跟我们使用numpy和panda一样,我们可以过滤不需要的内容并且做映射转换。
  • 现在需要删除书名和标点符号
lines = lines.filter(
   lambda x: not tf.strings.regex_full_match(x, "_BOOK_TITLE_.*")
)

punctuation = r'[!"#$%&()\*\+,-\./:;<=>?@\[\\\]^_{|}~\']'
lines = lines.map(lambda x: tf.strings.regex_replace(x, punctuation, ' '))
复制代码

打印几行出来康康

words = lines.map(tf.strings.split)
wordsets = words.unbatch().batch(11)

for row in wordsets.take(3):
 print(row)
复制代码

结果:

tf.Tensor(b'CHAPTER I   LCB  Chapter heading picture   p1 jpg  RCB  How the Fairies were not Invited to Court  ', shape=(), dtype=string)
tf.Tensor(b'Once upon a time there reigned in Pantouflia a king and a queen  ', shape=(), dtype=string)
tf.Tensor(b'With almost everything else to make them happy   they wanted one thing   they had no children  ', shape=(), dtype=string)
复制代码

建立想要的Label

现在每一行都是十一个单词的集合,但是需要每一个的最后一个单词作为label。所以需要写一个function分割出label用于训练。

def get_example_label(row):
 example = tf.strings.reduce_join(row[:-1], separator = ' ')
 example = tf.expand_dims(example, axis = 0)
 label = row[-1:]
 return example, label

data = wordsets.map(get_example_label)
data = data.shuffle(1000)
for row in data.take(3):
 print(row)
复制代码

结果:

(<tf.Tensor: shape=(1,), dtype=string, numpy=
array([b'were quite unendurable that nothing was more rude than to'],
     dtype=object)>, <tf.Tensor: shape=(1,), dtype=string, numpy=array([b'be'], dtype=object)>)
(<tf.Tensor: shape=(1,), dtype=string, numpy=
array([b'garret The prince pushed in the door with some difficulty'],
     dtype=object)>, <tf.Tensor: shape=(1,), dtype=string, numpy=array([b'not'], dtype=object)>)
(<tf.Tensor: shape=(1,), dtype=string, numpy=
array([b'Now as Pantouflia was a rich lazy country which hated'],
     dtype=object)>, <tf.Tensor: shape=(1,), dtype=string, numpy=array([b'fighting'], dtype=object)>)
复制代码

可是现在还不够,要想要用数据集输入模型进行训练,必须把数据变成矩阵。 使用 TensorFlow 2.0 新功能,预处理层,将数据集变成矩阵

max_features = 5000  # Maximum vocab size.

vectorize_layer = tf.keras.layers.experimental.preprocessing.TextVectorization(
 max_tokens=max_features,
 output_sequence_length=10)

vectorize_layer.adapt(lines.batch(64))
vectorize_layer.get_vocabulary()[:5]
vectorize_layer.get_vocabulary()[-5:]
for batch in data.batch(3).take(1):
 print(batch[0])
 print(vectorize_layer(batch[0]))
复制代码

结果:

tf.Tensor(
[[b'behind him and colder in front of him He looked']
[b'of hunting herself And the prince said Oh if you']
[b'devastated his country were actually dead But when he had']], shape=(3, 1), dtype=string)
tf.Tensor(
[[ 250   26    3    1   11  689    6   26    8  119]
[   6  909  234    3    2  233   27  146   40   13]
[   1   14  476   43 1614  340   21   36    8   17]], shape=(3, 10), dtype=int64)
复制代码

搭建seq2seq模型

文本生成模型使用seq2seq模型构建

模型包含两部分,第一部分使用RNN进行编码的编码器,第二部分是输入数据和编码状态的解码器预测正确的数据。seq2seq手写的话,会很复杂,很多参数需要监听。 但是现在可以使用TF的 seq2seq 插件减少复杂度。

编码器

  • 输入的数据转化成索引
  • 变成矩阵的数据输入之后,传入 embedding ,再到 LSTM

解码器

  • 使用 TF AddOn 采样解码器和 LSTM 解码器连接到投影层
  • 映射到字典的密集层
  • 使用两个 token 进行预测
  • 最后使用注意力机制去监听模型训练的中间状态,让模型有更好的权重

覆盖train_step

  • 覆盖TF fit 中的train_step函数,让我们可以自定义数据的走向
  • 进行梯度下降,和向前传播
import tensorflow as tf
import tensorflow_addons as tfa
tf.__version__
dir(tfa.seq2seq)
class EncoderDecoder(tf.keras.Model):
  def __init__(self, max_features=5000, embedding_dims=200, rnn_units=1024):
    super().__init__()
    self.max_features = max_features
    self.vectorize_layer = tf.keras.layers.experimental.preprocessing.TextVectorization(
        max_tokens=max_features,
        output_sequence_length=10)
    self.encoder_embedding = tf.keras.layers.Embedding(
        max_features + 1, embedding_dims)
    self.lstm_layer = tf.keras.layers.LSTM(rnn_units, return_state=True)

    self.decoder_embedding = tf.keras.layers.Embedding(
        max_features + 1, embedding_dims)
    sampler = tfa.seq2seq.sampler.TrainingSampler()
    decoder_cell = tf.keras.layers.LSTMCell(rnn_units)
    projection_layer = tf.keras.layers.Dense(max_features)
    self.decoder = tfa.seq2seq.BasicDecoder(
        decoder_cell, sampler, output_layer=projection_layer)
    
    self.attention = tf.keras.layers.Attention()

  def train_step(self, data):
    x, y = data[0], data[1]
    x = self.vectorize_layer(x)
    # The vectorize layer pads, but we only need the first val for labels
    y = self.vectorize_layer(y)[:, 0:1]
    y_one_hot = tf.one_hot(y, self.max_features)

    with tf.GradientTape() as tape:
      embedded_inputs = self.encoder_embedding(x)
      encoder_outputs, state_h, state_c = self.lstm_layer(embedded_inputs)
      
      attn_output = self.attention([encoder_outputs, state_h])
      attn_output = tf.expand_dims(attn_output, axis=1)
      
      targets = self.decoder_embedding(tf.zeros_like(y))
      concat_output = tf.concat([targets, attn_output], axis=-1)
      outputs, _, _ = self.decoder(
          concat_output, initial_state=[state_h, state_c])
      
      y_pred = outputs.rnn_output
      
      loss = self.compiled_loss(
          y_one_hot, 
          y_pred, 
          regularization_losses=self.losses)
    
    trainable_variables = self.trainable_variables
    gradients = tape.gradient(loss, trainable_variables)
    self.optimizer.apply_gradients(zip(gradients, trainable_variables))

    self.compiled_metrics.update_state(y_one_hot, y_pred)
    return {m.name: m.result() for m in self.metrics}

  def predict_step(self, data, select_from_top_n=1):
    x = data
    if isinstance(x, tuple) and len(x) == 2:
      x = x[0]
    x = self.vectorize_layer(x)
    embedded_inputs = self.encoder_embedding(x)
    encoder_outputs, state_h, state_c = self.lstm_layer(embedded_inputs)
    attn_output = self.attention([encoder_outputs, state_h])
    attn_output = tf.expand_dims(attn_output, axis=1)
    
    targets = self.decoder_embedding(tf.zeros_like(x[:, -1:]))
    concat_output = tf.concat([targets, attn_output], axis=-1)
    outputs, _, _ = self.decoder(
        concat_output, initial_state=[state_h, state_c])
    
    y_pred = tf.squeeze(outputs.rnn_output, axis=1)
    top_n = tf.argsort(
        y_pred[:, 2:], axis=1, direction='DESCENDING')[: ,:select_from_top_n]
    chosen_indices = tf.random.uniform(
        [top_n.shape[0], 1], minval=0, maxval=select_from_top_n, 
        dtype=tf.dtypes.int32)
    counter = tf.expand_dims(tf.range(0, top_n.shape[0]), axis=1)
    indices = tf.concat([counter, chosen_indices], axis=1)
    choices = tf.gather_nd(top_n, indices)
    words = [self.vectorize_layer.get_vocabulary()[i] for i in choices]
    return words

  def predict(self, starting_string, num_steps=50, select_from_top_n=1):
    s = tf.compat.as_bytes(starting_string).split(b' ')
    for _ in range(num_steps):
      windowed = [b' '.join(s[-10:])]
      pred = self.predict_step([windowed], select_from_top_n=select_from_top_n)
      s.append(pred[0])
    return b' '.join(s)
复制代码

选择损失函数和优化函数

model = EncoderDecoder()
model.compile(
    loss=tf.keras.losses.CategoricalCrossentropy(from_logits=True), 
    optimizer='adam', 
    metrics=['accuracy'])
model.vectorize_layer.adapt(lines.batch(256))
复制代码

训练!干!(训练四十轮,这时间可以去喝杯奶茶)

model.fit(data.batch(256), epochs=30, callbacks=[tf.keras.callbacks.ModelCheckpoint('text_gen_ckpt')])
model.fit(data.batch(256), epochs=10, callbacks=[tf.keras.callbacks.ModelCheckpoint('text_gen_ckpt')])
复制代码

加载模型,生成文本(预测)

model.load_weights('text_gen_ckpt')
print(model.predict('The mouse and the rabbit went in together'))

print(model.predict('Once upon a time there was a Queen named Darling'))
print(model.predict('In a city far from here the teacup shook upon the table'))

print(model.predict('It was a strange and quiet theater and the people watched from home'))
复制代码

训练效果还可以更好

使用 keras-tuner 做超参数搜索,找出最优参数,可以更好地拟合模型 (一边跑模型,一边调参)

import kerastuner as kt

def build_model(hp):
  model = EncoderDecoder(
      rnn_units=hp.Int('units', min_value=256, max_value=1200, step=256))
  
  model.compile(
      optimizer=tf.keras.optimizers.Adam(
            hp.Choice('learning_rate', values=[1e-3, 1e-4, 3e-4])),
      loss=tf.keras.losses.CategoricalCrossentropy(from_logits=True),                 
      metrics=['accuracy'])
  
  model.vectorize_layer.adapt(lines.batch(256))
  return model

tuner = kt.tuners.RandomSearch(
    build_model,
    objective='accuracy',
    max_trials=15,
    executions_per_trial=1,
    directory='my_dir',
    project_name='text_generation')

tuner.search(
    data.batch(256), 
    epochs=10, 
    callbacks=[tf.keras.callbacks.ModelCheckpoint('text_gen_ckpt')])
复制代码

再康康优化过的效果

model.load_weights('text_gen_ckpt')
print(model.predict('The mouse and the rabbit went in together'))

print(model.predict('Once upon a time there was a Queen named Darling'))
print(model.predict('In a city far from here the teacup shook upon the table'))

print(model.predict('It was a strange and quiet theater and the people watched from home'))
复制代码