阅读 888

像Event Emitter一样使用Web Worker

本文转载自 saul-mirone.github.io/2019/12/17/…

Web Worker可以在浏览器中添加可以和主线程通信的独立运行的线程。通过将可能阻塞主线程的大量计算移入web worker,我们可以保证主线程的流畅性,但是web worker默认的调用使用较为繁琐,因此我们可以按照自己的需求进行一些封装,本文探讨一种基于事件的封装模式。

原始调用

在开始之前,让我们看看直接使用web worker中的API大概是什么样的。 假设我们现在使用json来传递数据。

  • 在worker中:
self.addEventListener('message', (e) => {
  const { data } = e;
  if (!data) return;
  dataHandler(data);
});
复制代码
  • 在主线程中:
const worker = new Worker('path/to/worker');
worker.postMessage({ data: 'some data' });
复制代码

看起来很清晰明了,似乎不需要什么封装。

但是当传递的数据量增多,频率上升的时候,会出现明显的性能下降。

传递引用

Web worker在线程间传递数据时,有两种方法:

  1. 结构化克隆:默认的做法,clone一份数据给接受数据的线程,而不是共享实例。因此如果数据量很大,clone的成本也会随之增高。
  2. 移交:传递实现了Transferable接口的数据时, 可以使用这种方式。数据会被移交到目标线程的上下文中,不存在复制,因此性能会得到比较明显的提高。

具体可以参见谷歌的文档

目前实现了Transferable接口的数据类型包括: ArrayBuffer, MessagePort, ImageBitmap, OffscreenCanvas, 因此对于我们当前传递JSON结构数据这一场景, 使用ArrayBuffer是最好的选择。

因此我们需要实现一对encodedecode方法来把数据结构转换为ArrayBuffer

  • encode:
function encode<T>(data: T): Uint16Array {
  const str = JSON.stringify(data);
  const buf = new ArrayBuffer(str.length * 2);
  const bufView = new Uint16Array(buf);
  bufView.set(str.split("").map((_, i) => str.charCodeAt(i)));
  return bufView;
}
复制代码
  • decode:
function decode<T = unknown>(buf: ArrayBufferLike): T {
  return JSON.parse(
    String.fromCharCode.apply(
      null,
      (new Uint16Array(buf) as unknown) as number[]
    )
  );
}
复制代码

于是我们的代码就变成了:

  • 在worker中:
self.addEventListener('message', (e) => {
  const { data } = e;
  if (!data) return;
  dataHandler(decode(data));
});
复制代码
  • 在主线程中:
const worker = new Worker('path/to/worker');
const arrayBuffer = encode(data);
worker.postMessage(arrayBuffer.buffer, [arrayBuffer.buffer]);
复制代码

为每次调用提供响应

在web worker中,主线程向worker线程中发送了消息之后,就无法再追踪这条消息的状态了, 只能通过子线程主动调用postMessage将状态告知主线程。

假设我们现在想在子线程调用结束后让主线程得到通知,我们可以给每条消息追加一个id,通过这条id来追踪一次调用的状态:

  • 在worker中:
self.addEventListener('message', (e) => {
  const { data } = e;
  if (!data) return;
  const returnMessage = dataHandler(decode(data.message));
  const arrayBuffer = encode({ id: data.id, message: returnMessage });
  self.postMessage(arrayBuffer.buffer, [arrayBuffer.buffer]);
});
复制代码
  • 在主线程中:
const worker = new Worker('path/to/worker');
const id: string = uuid();
const arrayBuffer = encode({ id, message: data });
worker.postMessage(arrayBuffer.buffer, [arrayBuffer.buffer]);

const id2: string = uuid();
const arrayBuffer2 = encode({ id: id2, message: data });
worker.postMessage(arrayBuffer2.buffer, [arrayBuffer2.buffer]);
worker.onmessage = (e) => {
  const { data } = e;
  if (!data) return;
  const returnData = decode(data);
  if (id === returnData.id) {
    callback1(returnData.message);
  }
  
  if (id2 === message.id) {
    callback2(returnData.message);
  }
}
复制代码

可以看到,我们需要手动管理每次消息传递的编解码和回调映射,写起来十分繁琐。 下面我们对通用的处理进行一些封装。

基于Promise的worker调用

观察代码,当我们每次发送消息给worker的时候,总是需要构造消息的id,并添加对应的处理回调。 和Promise完美契合。

// 主线程调用的构造类
class PromiseWorker {
  private messageMap: Map<string, Function> = new Map();
  constructor(private readonly worker: Worker) {
    worker.onmessage = e => {
      const { data } = e;
      if (!data) return;
      const { id, message } = decode(data);
      const res = this.messageMap.get(id);
      if (!res) return;
      res(message);
      this.messageMap.delete(id);
    }
  }

  emit<T, U>(message: T): Promise<U> {
    return new Promise(resolve => {
      const id = uuid();
      const data = encode({ id, message });
      this.messageMap.set(id, resolve);
      this.worker.postMessage(data.buffer, [data.buffer]);
    });
  }
}
// 子线程调用的注册方法
function register(handler: Function) {
  const post = (message) => {
    const data = encode(message);
    self.postMessage(data.buffer, [data.buffer]);
  }
  self.onmessage = async (e: MessageEvent) => {
    const { data } = e;
    if (!data) return;

    const { id, message } = decode(data);

    const result = (await mapping[type](message)) || "done";
    post({ id, message: result });
  };
}
复制代码

使用时:

  • 在worker中:
register(async (message) => {
  const data = await someFetch(message);
  return someHandler(data);
})
复制代码
  • 在主线程中:
const worker = new Worker('path/to/worker');
const promiseWorker = new PromiseWorker(worker);
promiseWorker.emit(data).then(result => console.log(result));
复制代码

非常方便。

实现事件风格的调用方式

PromiseWorker已经十分好用了,但是当我们需要给发送的消息进行分类,并按不同类型响应的时候,难免有一些模板代码:

  • 在worker中:
register(async (message) => {
  switch (message.type) {
    case 'ACTION_A':
      return handler1(message.data);
    case 'ACTION_B':
      return handler2(message.data);
    default:
      return handler3(message.data);
  }
})
复制代码
  • 在主线程中:
const worker = new Worker('path/to/worker');
const promiseWorker = new PromiseWorker(worker);
promiseWorker.emit({ type: 'ACTION_A', data: dataA }).then(result => console.log(result));
promiseWorker.emit({ type: 'ACTION_B', data: dataB }).then(result => console.log(result));
复制代码

因此我们可以用事件模型来对消息进行分类,按类别进行响应,其实只需要给消息模型中添加type字段就好了。

class WorkerEmitter {
  private messageMap: Map<
    string,
    { callback: Function; type: string | number }
  > = new Map();
  constructor(private readonly worker: Worker) {
    worker.onmessage = e => {
      const { data } = e;
      if (!data) return;

      const { id, message } = decode(data);
      const ret = this.messageMap.get(id);
      if (!ret) return;

      const { callback } = ret;

      callback(message);
      this.messageMap.delete(id);
    };
  }
  emit<T, U>(type: string | number, message: T): Promise<U> {
    return new Promise(resolve => {
      const id = uuid();
      const data = encode({
        id,
        type,
        message
      });
      this.messageMap.set(id, {
        type,
        callback: (x: U) => {
          resolve(x);
        }
      });
      this.worker.postMessage(data.buffer, [data.buffer]);
    });
  }
  terminate() {
    this.worker.terminate();
  }
}
type WorkerInstance = {
  on(type: string, handler: Function): void;
};
function register(): WorkerInstance {
  const mapping: Record<string, Function> = {};
  const post = (message: Data): void => {
    const data = encode(message);
    self.postMessage(data.buffer, [data.buffer]);
  };
  self.onmessage = async (e: MessageEvent) => {
    const { data } = e;
    if (!data) return;

    const { type, id, message } = decode(data);

    const result = (await mapping[type](message)) || "done";
    post({ id, type, message: result });
  };

  return {
    on: (type, handler) => {
      mapping[type] = handler;
    }
  };
}
复制代码

调用时:

  • 在worker中:
const worker = register();

worker.on('ACTION_A', handler1);
worker.on('ACTION_B', handler2);
复制代码
  • 在主线程中:
const worker = new Worker('path/to/worker');
const workerEmitter = new WorkerEmitter(worker);
workerEmitter.emit('ACTION_A', dataA).then(result => console.log(result));
workerEmitter.emit('ACTION_B', dataB).then(result => console.log(result));
复制代码

从此,使用web worker就可以像触发事件一样轻松了,以上源码可在Github上查阅。


我将这个封装发布在了npm上:worker-emitter,需要的话直接使用即可。


关注【IVWEB社区】公众号查看最新技术周刊,今天的你比昨天更优秀!


关注下面的标签,发现更多相似文章
评论