那就先从 fs.readFie 开始
fs.readFile 是怎么工作的?
fs.readFile() 接收 3 个传参,分别是 path, options, callback。通过下面的代码可以看到,其中的 options 是一个可选的参数,callback 始终是取最后一个参数。path 支持路径字符或者文件标识符。
fs.readFile = function(path, options, callback) {
// 接收最后一个参数作为传参
callback = maybeCallback(arguments[arguments.length - 1]);
// 初始化 options
options = getOptions(options, { flag: 'r' });
// getPathFromURL
if (handleError((path = getPathFromURL(path)), callback))
return;
// path 和 callback 校验, path 中不能存在 `\u0000`
if (!nullCheck(path, callback))
return;
// 创建文件读取上下文实例
var context = new ReadFileContext(callback, options.encoding);
// 判断 path 是否是文件描述符
context.isUserFd = isFd(path);
// 创建一个文件读取实例
var req = new FSReqWrap();
req.context = context;
// 设置文件读取 after open 回调
req.oncomplete = readFileAfterOpen;
// path是文件描述符,直接在nextTick执行open的回调
if (context.isUserFd) {
process.nextTick(function() {
req.oncomplete(null, path);
});
return;
}
// 使用 `path` 模块对路径参数 makeLong 处理
// 调用 fs 模块的 open
binding.open(pathModule._makeLong(path),
stringToFlags(options.flag || 'r'),
0o666,
req);
};
readFileAfterOpen
这里 readFileAfterOpen() 是一个通用的回调参数, 主要进行 open 操作之后的异常处理以及调用下一步的 stat
function readFileAfterOpen (err, fd) {
var context = this.context;
// open 失败,执行回调
if (err) {
context.callback(err);
return;
}
// 给上下文对象赋值文件标识符
context.fd = fd;
// 创建一个新的文件请求
var req = new FSReqWrap();
req.oncomplete = readFileAfterStat;
req.context = context;
binding.fstat(fd, req);
}
ReadFileContext
ReadFileContext() 是读取文件的上下文的构造器,它的实例会有当前读取文件的标识符,大小,编码,读取的位置等关键的属性。
ReadFileContext() 还有两个原型方法 read() 和 close()。
ReadFileContext.prototype.read = function() {
...
var req = new FSReqWrap();
req.oncomplete = readFileAfterRead;
req.context = this;
binding.read(this.fd, buffer, offset, length, -1, req);
};
ReadFileContext.prototype.close = function(err) {
var req = new FSReqWrap();
req.oncomplete = readFileAfterClose;
req.context = this;
this.err = err;
if (this.isUserFd) {
process.nextTick(function() {
req.oncomplete(null);
});
return;
}
binding.close(this.fd, req);
};
readFileAfterRead() 绑定在 FSReqWrap 实例的 oncomplete 回调上,readFileAfterRead 会持续读取文件内容。
FSReqWrap
我们在上面的代码中见到所有涉及文件操作的回调的地方都看到了 FSReqWrap 的身影。下面我们来看看 FSReqWrap 是怎么实现的。
FSReqWrap 继承自 ReqWrap, ReqWrap 和上一篇文章提到的 HandleWrap 都是继承自 AsyncWrap。
class FSReqWrap: public ReqWrap<uv_fs_t> {
public:
enum Ownership { COPY, MOVE };
inline static FSReqWrap* New(Environment* env,
Local<Object> req,
const char* syscall,
const char* data = nullptr,
enum encoding encoding = UTF8,
Ownership ownership = COPY);
inline void Dispose();
...
};
我们回到最开始 JavaScript 部分,通过赋值req.oncomplete 实现的设置回调。那么 oncomplete() 是在上面时候执行的?
var req = new FSReqWrap();
req.context = context;
req.oncomplete = readFileAfterOpen;
Read
Read() 就是 process.binding('fs').read() 的实现, 这个实现是对read(2)的一个包装。 看到 Read() 的最后的 ASYNC_CALL() 和 SYNC_CALL(),差不多能得出结论,也就是实现 fs.readFile() 和 fs.readFileSync() 等同步和异步 文件系统API的实现基础。
static void Read(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
...
uv_buf_t uvbuf = uv_buf_init(const_cast<char*>(buf), len);
req = args[5];
if (req->IsObject()) {
ASYNC_CALL(read, req, UTF8, fd, &uvbuf, 1, pos);
} else {
SYNC_CALL(read, 0, fd, &uvbuf, 1, pos)
args.GetReturnValue().Set(SYNC_RESULT);
}
}
uv_buf_t 是用于保存数据的单元,它被抽象成了 buffer 结构,只保存了指向真实数据的指针(uv_buf_t.base) 以及真实数据的长度 (uv_buf_t.len)
ASYNC_CALL
#define ASYNC_CALL(func, req, encoding, ...) \
ASYNC_DEST_CALL(func, req, nullptr, encoding, __VA_ARGS__) \
ASYNC_DEST_CALL
#define ASYNC_DEST_CALL(func, request, dest, encoding, ...) \
Environment* env = Environment::GetCurrent(args); \
CHECK(request->IsObject()); \
FSReqWrap* req_wrap = FSReqWrap::New(env, request.As<Object>(), \
#func, dest, encoding); \
int err = uv_fs_ ## func(env->event_loop(), \
req_wrap->req(), \
__VA_ARGS__, \
After); \
req_wrap->Dispatched(); \
if (err < 0) { \
uv_fs_t* uv_req = req_wrap->req(); \
uv_req->result = err; \
uv_req->path = nullptr; \
After(uv_req); \
req_wrap = nullptr; \
} else { \
args.GetReturnValue().Set(req_wrap->persistent()); \
}
文件读写是通过 uv_fs_* 函数族和 uv_fs_t 结构体完成的。uv_fs_t 的 result 域保存了 uv_fs_open 回调函数打开的文件描述符。如果文件被正确地打开,我们可以开始读取了。
After
static void After(uv_fs_t *req) {
FSReqWrap* req_wrap = static_cast<FSReqWrap*>(req->data);
CHECK_EQ(req_wrap->req(), req);
req_wrap->ReleaseEarly(); // Free memory that's no longer used now.
...
// 执行回调
req_wrap->MakeCallback(env->oncomplete_string(), argc, argv);
uv_fs_req_cleanup(req_wrap->req());
req_wrap->Dispose();
函数 uv_fs_req_cleanup() 在文件系统操作结束后必须要被调用,用来回收在读写中分配的内存。
uv_fs_read
POST 是一个宏定义,他处理异步回调任务和同步任务。判断有异步回调的话,调用uv__work_submit() 将异步请求推入线程池。线程池最大数量限制是 128。
// deps/uv/src/unix/fs.c
int uv_fs_read(uv_loop_t* loop, uv_fs_t* req,
uv_file file,
const uv_buf_t bufs[],
unsigned int nbufs,
int64_t off,
uv_fs_cb cb) {
if (bufs == NULL || nbufs == 0)
return -EINVAL;
INIT(READ);
req->file = file;
req->nbufs = nbufs;
req->bufs = req->bufsml;
if (nbufs > ARRAY_SIZE(req->bufsml))
req->bufs = uv__malloc(nbufs * sizeof(*bufs));
if (req->bufs == NULL) {
if (cb != NULL)
uv__req_unregister(loop, req);
return -ENOMEM;
}
memcpy(req->bufs, bufs, nbufs * sizeof(*bufs));
req->off = off;
POST;
}
#define POST \
do { \
if (cb != NULL) { \
uv__work_submit(loop, &req->work_req, uv__fs_work, uv__fs_done); \
return 0; \
} \
else { \
uv__fs_work(&req->work_req); \
return req->result; \
} \
} \
while (0)
uv__work_submit
uv__work_submit() 接收 eventloop 结构体,uv__work 结构体以及uv__fs_work,uv__fs_done 函数。
// deps/uv/src/threadpool.c
void uv__work_submit(uv_loop_t* loop,
struct uv__work* w,
void (*work)(struct uv__work* w),
void (*done)(struct uv__work* w, int status)) {
uv_once(&once, init_once);
w->loop = loop;
w->work = work;
w->done = done;
post(&w->wq);
}
static void post(QUEUE* q) {
// 添加互斥锁
uv_mutex_lock(&mutex);
// 插入链表队尾
QUEUE_INSERT_TAIL(&wq, q);
if (idle_threads > 0)
uv_cond_signal(&cond);
// unlock
uv_mutex_unlock(&mutex);
}
worker
worker 顺序取出队列的第一个任务,并执行 w->work(w),最后调用 uv_async_send() 。直至 exit_message 时退出。
static void worker(void* arg) {
struct uv__work* w;
QUEUE* q;
(void) arg;
for (;;) {
uv_mutex_lock(&mutex);
while (QUEUE_EMPTY(&wq)) {
idle_threads += 1;
uv_cond_wait(&cond, &mutex);
idle_threads -= 1;
}
q = QUEUE_HEAD(&wq);
if (q == &exit_message)
uv_cond_signal(&cond);
else {
QUEUE_REMOVE(q);
QUEUE_INIT(q); /* Signal uv_cancel() that the work req is
executing. */
}
uv_mutex_unlock(&mutex);
if (q == &exit_message)
break;
w = QUEUE_DATA(q, struct uv__work, wq);
w->work(w);
uv_mutex_lock(&w->loop->wq_mutex);
w->work = NULL; /* Signal uv_cancel() that the work req is done
executing. */
QUEUE_INSERT_TAIL(&w->loop->wq, &w->wq);
uv_async_send(&w->loop->wq_async);
uv_mutex_unlock(&w->loop->wq_mutex);
}
}
uv__async_send
uv_async_send() 的作用是通知 io_watcher 执行相应线程上的回调。
// deps/uv/src/unix/async.c
void uv__async_send(struct uv__async* wa) {
const void* buf;
ssize_t len;
int fd;
int r;
buf = "";
len = 1;
fd = wa->wfd;
#if defined(__linux__)
if (fd == -1) {
static const uint64_t val = 1;
buf = &val;
len = sizeof(val);
fd = wa->io_watcher.fd; /* eventfd */
}
#endif
do
r = write(fd, buf, len);
while (r == -1 && errno == EINTR);
if (r == len)
return;
if (r == -1)
if (errno == EAGAIN || errno == EWOULDBLOCK)
return;
abort();
}
总结
我们通过以上的源码,从 fs.readFile() 深入到了 libuv 的 uv_fs* 函数以及 uv_async_send()。
虽然我们在写 Node.js 时是在一条主线程中, 我们不需要考虑变量的共享以及锁的问题。但当我们处理异步 IO 操作中,背后是多个线程处理异步 IO。