如何用 Next.js v14 实现一个 Streaming 接口?

3,364 阅读8分钟

本文为稀土掘金技术社区首发签约文章,30天内禁止转载,30天后未获授权禁止转载,侵权必究!

前言

什么是 Streaming ?

百度百科倒是有关于 Streaming Media 的解释:

流媒体(streaming media)是指将一连串的媒体数据压缩后,经过网上分段发送数据,在网上即时传输影音以供观赏的一种技术与过程,此技术使得数据包得以像流水一样发送;如果不使用此技术,就必须在使用前下载整个媒体文件。

放到 HTTP 请求上,Streaming 差不多也是同样的意思,将数据分段发送给客户端。

一个比较常见的应用是 ChatGPT 的打字效果:

如果我们要用 Next.js 实现一个具有 Streaming 效果的接口该如何实现呢?

注:本篇的最后我们会调用 OpenAI 的接口来实现这样一个效果。

Fetch API

我们先从 Fetch API 开始说起。

Fetch API 想必大家已经很熟悉了,我们常常会这样使用 fetch:

fetch("http://example.com/movies.json")
  .then((response) => response.json())
  .then((data) => console.log(data));

在这个例子中,我们获取了一个 JSON 文件并将其打印。为了获取 JSON 的内容,我们需要使用 json() 方法(该方法返回一个将 response body 解析成 JSON 的 promise)。

实际上,response 还有一个 body 只读属性,它是一个简单的 getter,用于暴露一个 ReadableStream 类型的 body 内容。简单来说,fetch 的 response.body 会返回一个流类型的内容。这样的设计在请求大体积文件的时候会很有用。

ReadableStream

如何读取

response.body 返回一个 ReadableStream 类型的内容。ReadableStream 有一个 getReader() 实例方法,它会创建一个读取器并将流锁定。一旦流被锁定,其他读取器将不能读取它,直到它被释放。

读取器是 ReadableStreamDefaultReader 类型,它是用于读取来自网络提供的流数据(例如 fetch 请求)的默认 reader。它有一个 read() 实例方法,返回一个 Promise,提供对流内部队列中下一个分块的访问权限。而这个 Promise 的值,其 resolve / reject 的结果取决于流的状态:

  • 如果有 chuck 可用,则 promise 将使用 { value: theChunk, done: false } 形式的对象来 resolve。
  • 如果流已经关闭,则 promise 将使用 { value: undefined, done: true } 形式的对象来 resolve。
  • 如果流发生错误,promise 将因相关错误被拒绝。

根据 MDN ReadableStream.getReader() 中的例子尝试读取一下 ReadableStream 中的内容:

const decoder = new TextDecoder('utf-8');
fetch('https://api.thecatapi.com/v1/images/search')
  .then((response) => response.body)
  .then((body) => {
    const reader = body.getReader();
    reader.read().then(function process({ done, value }) {
      if (done) {
        console.log('Stream finished');
        return;
      }
      const text = decoder.decode(value);
      console.log('Received data chunk', text);

      return reader.read().then(process);
    });
  })

这里我们写了一个递归,不断读取流中的内容,直到流关闭(done 为 true)。我们可以复制这段代码,在浏览器中运行,效果如下:

image.png

因为接口本身不是流式,所以这里只有一个 chunk,正是接口的返回内容。

如何创建

知道了如何读取,我们又该如何创建一个 ReadableStream 类型的内容呢?

可以使用 ReadableStream() 构造函数,示例代码如下:

const stream = new ReadableStream(
  {
    start(controller) {},
    pull(controller) {},
    cancel() {},
    type,
    autoAllocateChunkSize,
  }
);

构造函数第一个参数对象包含着五个属性,仅有第一个是必要的:

  • start(controller) —— 一个在 ReadableStream 构建后,立即被调用一次的方法。在这个方法中,你应该包含设置流功能的代码,例如开始生成数据或者以其他的方式访问资源时

  • pull(controller) —— 一个方法,当被包含时,它会被重复的调用直到填满流的内置队列。当排入更多的分块时,这可以用于控制流

  • cancel() —— 一个方法,当被包含时,如果应用发出流将被取消的信号,它将被调用(例如,调用 ReadableStream.cancel())。内容应该采取任何必要的措施释放对流源的访问

  • typeautoAllocateChunkSize —— 当它们被包含时,会被用来表示流将是一个字节流。字节流将在未来的教程中单独涵盖,因为它们在目的和用例上与常规的(默认的)流有些不同。它们也未在任何地方实施。

让我们写个例子熟悉一下:

fetch('https://mdn.github.io/dom-examples/streams/simple-pump/tortoise.png')
  .then(response => response.body)
  .then(rs => {
    const reader = rs.getReader();

    return new ReadableStream({
      async start(controller) {
        while (true) {
          const { done, value } = await reader.read();

          if (done) {
            break;
          }

          controller.enqueue(value);
        }

        controller.close();
        reader.releaseLock();
      }
    })
  })
  .then(rs => new Response(rs))
  .then(response => response.blob())
  .then(blob => URL.createObjectURL(blob))
  .then(url => {
    var img = new Image(); 
    img.src = url;  
    document.body.append(img)
  })
  .catch(console.error);

在这个例子中,我们 fetch 了一张图片,先用读取器访问流,根据流的内容创建新的流文件(相当于拷贝一遍),然后获取新的流文件,最终将其转换为图片元素,添加到 body 中。我们复制这段代码到浏览器中,效果如下:

openai-4gif.gif

在这段代码中,可能有点困惑的是 start 函数的第一个参数 controller,它是 ReadableStreamDefaultController 类型,用于控制 ReadableStream 的状态和内部队列。

简单来说,它有三个实例方法,close() 用于关闭流,enqueue() 用于加入流,error() 用于报错。

Next.js 实现 Streaming

基础示例

有了这些基础知识,让我们用 Next.js 实现一个 Streaming 接口吧。

使用官方脚手架创建一个 Next.js 项目:

npx create-next-app@latest

运行效果如下:

image.png

为了样式美观,我们会用到 Tailwind CSS,所以注意勾选 Tailwind CSS,其他随意。

新建 api/chat/route.js,代码如下:

function iteratorToStream(iterator) {
  return new ReadableStream({
    async pull(controller) {
      const { value, done } = await iterator.next()
 
      if (done) {
        controller.close()
      } else {
        controller.enqueue(value)
      }
    },
  })
}
 
function sleep(time) {
  return new Promise((resolve) => {
    setTimeout(resolve, time)
  })
}
 
const encoder = new TextEncoder()
 
async function* makeIterator() {
  yield encoder.encode('<p>One</p>')
  await sleep(1000)
  yield encoder.encode('<p>Two</p>')
  await sleep(1000)
  yield encoder.encode('<p>Three</p>')
}
 
export async function GET() {
  const iterator = makeIterator()
  const stream = iteratorToStream(iterator)
 
  return new Response(stream)
}

这段代码是 Next.js 官方提供的关于使用底层 API 实现 Streaming 的示例代码,其中又参考了 MDN ReadableStream 的示例代码。代码逻辑并不复杂,主要功能是在运行迭代器,不断将内容推到流中。为了效果明显,我们加了 sleep 函数。

本地运行 npm run dev,此时访问 http://localhost:3000/api/chat,效果如下:

openai-5.gif

注意:Next.js 开发模式默认开启 React Strict Mode,这会导致请求调用两次,影响这里的结果。你可以在 next.config.js配置中关闭 React Strict Mode:

const nextConfig = {
  reactStrictMode: false,
};

export default nextConfig;

随着请求的持续连接,这些内容会间隔 1s 出现。查看此接口的响应头:

截屏2024-03-09 21.33.27.png

请求之所以能够持续返回数据,也是得益于 HTTP 的 Transfer-Encoding 标头的值为 chunked,表示数据将以一系列分块的形式进行发送。

分块传输编码(Chunked transfer encoding)是超文本传输协议(HTTP)中的一种数据传输机制,允许 HTTP由网页服务器发送给客户端应用( 通常是网页浏览器)的数据可以分成多个部分。分块传输编码只在 HTTP 协议1.1版本(HTTP/1.1)中提供。

接口写好了,前端又该如何调用呢?

这个时候就要用到前面讲 ReadableStream 读取的内容了。修改 app/page.js,代码如下:

'use client'

const decoder = new TextDecoder('utf-8');
import { useEffect, useState } from "react";

export default function Chat() {

  const [text, setText] = useState('')

  useEffect(() => {

    const fetchData = async () => {
      const response = await fetch("http://localhost:3000/api/chat");
      const reader = response.body.getReader();
      reader.read().then(function process({ done, value }) {
        if (done) {
          console.log('Stream finished');
          return;
        }
        const text = decoder.decode(value);
        console.log('Received data chunk', text);

        setText((value) => {
          return value + text
        })
    
        return reader.read().then(process);
      });
    }

    fetchData()
  }, [])

  return (
    <div className="flex flex-col w-full max-w-md py-24 mx-auto stretch">
      {text}
    </div>
  );
}

刷新页面,交互效果如下:

openai-7.gif

调用 OpenAI 接口

写 Streaming 接口,一个很常见的应用是后端调用大模型的接口,比如 OpenAI 的接口:

import OpenAI from "openai";

const openai = new OpenAI();

async function main() {
    const stream = await openai.chat.completions.create({
        model: "gpt-4",
        messages: [{ role: "user", content: "Say this is a test" }],
        stream: true,
    });
    for await (const chunk of stream) {
        process.stdout.write(chunk.choices[0]?.delta?.content || "");
    }
}

main();

让我们实现一下开头的那个效果。

为此你需要准备一个 OpenAI API 3.5 的 KEY。修改 api/chat/route.js,代码如下:

import OpenAI from 'openai';

const openai = new OpenAI({
  apiKey: process.env.OPENAI_API_KEY || '',
  baseURL: "https://api.openai-proxy.com/v1"
});

const encoder = new TextEncoder()

async function* makeIterator(response) {
  for await (const chunk of response) {
    const delta = chunk.choices[0].delta.content

    yield encoder.encode(delta)
  }
}

function iteratorToStream(iterator) {
  return new ReadableStream({
    async pull(controller) {
      const { value, done } = await iterator.next()
 
      if (done) {
        controller.close()
      } else {
        controller.enqueue(value)
      }
    },
  })
}

export async function POST(req) {
  const { messages } = await req.json();
  const response = await openai.chat.completions.create({
    model: 'gpt-3.5-turbo',
    stream: true,
    messages,
  });

  return new Response(iteratorToStream(makeIterator(response)))
}

新建 .env.local,代码如下:

OPENAI_API_KEY=sk-xxxxxxxxxxxxxxxxxxxxxxxxxxx

修改 app/page.js,代码如下:

'use client';

import { useState, useEffect } from "react";
const decoder = new TextDecoder('utf-8');
export default function Chat() {
  const [text, setText] = useState('')
  const [input, setInput] = useState('')

  const handleInputChange = (e) => {
    setInput(e.target.value)
  }

  const handleSubmit = async (e) => {
    e.preventDefault()
    setText('')
    setInput('')
    
    const response = await fetchData(input)
    const reader = response.body.getReader();
    
    reader.read().then(function process({ done, value }) {
      if (done) {
        console.log('Stream finished');
        return;
      }
      const text = decoder.decode(value);
      console.log('Received data chunk', text);

      setText((value) => {
        return value + text
      })
  
      return reader.read().then(process);
    });
    
  }

  const fetchData = async (input) => {
    const response = await fetch("http://localhost:3000/api/chat", {
      method: "POST",
      body: JSON.stringify({messages: [{ role: "user", content: input }]})
    });
    return response
  }

  return (
    <div className="flex flex-col w-full max-w-md p-2 mx-auto stretch">
        <div className="whitespace-pre-wrap">
          {text ? 'AI: ' + text : ''}
        </div>
      <form onSubmit={handleSubmit}>
        <input
          className="fixed bottom-0 w-full max-w-md p-2 mb-8 border border-gray-300 rounded shadow-xl"
          value={input}
          placeholder="Say something..."
          onChange={handleInputChange}
        />
      </form>
    </div>
  );
}

交互效果如下:

openai-9.gif

从右侧浏览器的打印中,我们也可以看出,随着内容的不断返回,React 在不断的渲染内容,这才实现了打字流的效果。

总结

本篇我们从 fetch API 开始讲起,response.body 返回的正是一个 ReadableStream 类型的只读流。接下来我们讲了 ReadableStream 中的内容如何读取以及 ReadableStream 如何创建。借助底层 API 实现流的时候,就需要通过创建 ReadableStream 的方式来实现。

最后我们讲了两个全栈示例,后端接口如何创建,前端又该如何调用,希望对大家业务中实现 Streaming 有借鉴意义。

PS:学习 Next.js,欢迎入手小册《Next.js 开发指南》,基础篇、实战篇、源码篇、面试篇四大篇章带你系统掌握 Next.js!

参考链接

  1. developer.mozilla.org/zh-CN/docs/…
  2. developer.mozilla.org/zh-CN/docs/…