Node.js 多线程运行 Python 代码

avatar
阿里巴巴 前端委员会智能化小组 @阿里巴巴

文/ 阿里淘系 F(x) Team - 雷姆

故事的背景

假期前,有个用户吐槽 Boa(不知道Boa的小伙伴可以看下介绍),说: 在使用 Boa 的过程中,会阻塞 Node.js 主线程/进程,对于 Node.js 开发者来说,是非常难以接受的一件事


其实这在之前,有次团队内部讨论的过程中,也讨论过这个问题,我们曾经提议过两个方案。

方案一

将所有 Python 的函数都异步化到一个全新的线程,然后通过 uv worker queue 来进行通讯,但我极力地反对这个方案,因为这将会大大增加了 Boa 代码的复杂度,比如要获取一个简单的 sys.path 都需要通过 await 来完成,比如:

const boa = require('@pipcook/boa');
const sys = await boa.import('sys');
console.log(await sys.path);


这样做所付出的代价是非常昂贵的,所以因此放弃了。

方案二

提供一个单独的函数,比如 boa.newThread 这样的,然后将需要执行的代码放到这个新的线程中执行,但是这样做需要面临有两个问题,一个是增加新 API 对于用于的学习成本,另一个是 Python/CPython 自身的线程安全性,在不同的线程中使用同一个 Python 对象池的话,可能会导致一些数据一致性的问题,也正式因为这个原因没有被 CPython 彻底解决,导致即使 CPython 提供了新建解释器的接口,也需要通过 swap 来切换当前“进程”的解释器,也就是所同一个进程仍然只能执行一个 Python 代码。

在上述两个方案都被否决之后,这个问题就被我们搁置了有一段时间了,直到收到这个反馈,我再次开始尝试解决。

这次,我想通过 Node.js 的 worker_threads来提供 API,另外在了解了一些 Python 的线程和对象模型后,发现在 Boa 与正常 Python 的多线程使用场景上,有非常大的区别。


如何使用

我们先来看看实现后的版本,开发者如何使用。

const { Worker, isMainThread, workerData, parentPort } = require('worker_threads');
const boa = require('@pipcook/boa');
const pybasic = boa.import('tests.base.basic'); // a Python example
const { SharedPythonObject, symbols } = boa;
class Foobar extends pybasic.Foobar {
  hellomsg(x) {
    return `hello <${x}> on ${this.test}(${this.count})`;
  }
}
if (isMainThread) {
  const foo = new Foobar();
  const worker = new Worker(__filename, {
    workerData: {
      foo: new SharedPythonObject(foo),
    },
  });
  let alive = setInterval(() => {
    const ownership = foo[symbols.GetOwnershipSymbol]();
    console.log(`ownership should be ${expectedOwnership}.`);
  }, 1000);
  worker.on('message', state => {
    if (state === 'done') {
      console.log('task is completed');
      setTimeout(() => {
        clearInterval(alive);
        console.log(foo.ping('x'));
      }, 1000);
    }
  });
} else {
  const { foo } = workerData;
  console.log(`worker: get an object${foo} and sleep 5s in Python`);
  foo.sleep(); // this is a blocking function which is implemented at Python to sleep 1s
  
  console.log('python sleep is done, and sleep in nodejs(thread)');
  setTimeout(() => parentPort.postMessage('done'), 1000);
}


从上述代码中,boa 与 worker_threads 正常的用法并没有太大的区别,基本上是:

  • 通过 Worker 创建新的 JavaScript 线程
  • 通过 workerData 来传递数据,并且可以通过 Shared* 类共享对象


Boa 中需要允许直接传递任何 Python 对象(而在 JavaScript 中只能通过该算法传递复杂的 JavaScript 对象),在子线程执行结束后,我们也会为用户恢复源对象的使用。

我们在 Boa 中新增了一个 SharedPythonObject() 类,通过它便能实现上面说的,在工作线程中使用主线程中的对象,比如:

// 主线程
const shared = new SharedPythonObject(foobar);


接下来,将变量 shared 通过 workerData 传递给 Worker,在工作线程中,再通过 workerData 获取。 注意,在获取之前,需要先 require('@pipcook/boa'),Boa 会在初始化时对 workerData 中的 SharedPythonObject 实例做一些预处理,这样在使用过程中就能自然地使用了。


以上就是在 worker_threads 中异步使用 Boa 的所有接口了,学习成本是不是相对于之前的几个方案都要简单呢。


设计与实现

整个接口的设计主要体现以下两方面:

  • 如何保证 Boa 在多线程的 Node.js 环境工作
  • 如何保证 Python 的线程安全

如何保证 Boa 在多线程的 Node.js 环境工作

由于 CPython 的设计便是单个进程中只允许一个执行上下文(哪怕是通过 Py_NewInterpreter()也需要使用 swap 机制来切换当前执行的解释器的哪一个),另一方面对于 Node.js 开发者来说,需要的也并非多线程地执行 Python 代码,而是在调用 Python 的时候,不阻塞当前 JavaScript 线程的执行。

那么本质上,我们就不需要 Py_NewInterpreter() 来为每个子线程都创建一个新的解释器,而是可以允许一个进程内只有一个解释器,然后通过 API 来在每个线程中都能使用这个解释器即可。

要做到上面所说的,只需通过 CPython 的 Py_IsInitialized() 函数进行判断即可完成。

如何保证 Python 的线程安全

至此,我们便能让 Boa 在 worker_threads 中工作了,那么接下来我们就来看看如何保证在工作线程/主线程中共享对象及其线程安全。

首先,我为每个 Python 对象增加了所有权(Ownership)这一概念(借鉴自 Rust),但与 Rust Ownership 不同的是,在子线程退出之后,所有权会回归到源对象。 对象所有权即是说,它表示当前> 主线程的该对象是否有权使用该对象。


在 SharedPythonObject 内部,会调用 requestOwnership 向主线程的对象“借走”所有权,此时在使用源对象时,会抛出一个 JavaScript 异常,而非段错误。

接下来,就可以将借走所有权的对象交给任何一个工作线程(通过 Worker 创建),当这个工作线程退出后,所有权会被恢复回源对象了。

**如何借走所有权?**在调用 requestOwnership 时,会将创建的 ObjectOwnership 指针转换为数值,并通过 workerData 传递到工作线程,并在初始化时,检查 workerData 中符合协议的对象,并从指针值重新找到 ObjectOwnership 对象来获取所有权及对应的对象。


由于所有权的实现是线程安全的,因此开发者基于所有权在不同线程中使用 Python 对象时,也是线程安全的,并且我在设计 SharedPythonObject 也参考了 v8 的 SharedArrayBuffer 来命名,便于开发者理解。

目前这个实现仍然在 CodeReview 中,大家对完整实现感兴趣的可以点击 PR链接