阅读 202

Node.js高级程序员晋升系列之-从0开始实现一个锁

接上一篇

Node.js高级程序员晋升系列之-Node进程模型解析和服务器端多进程部署

不是说Node是单线程,为什么会需要锁?

回答这个问题之前,我们先来看上一篇文章的一个例子

class WeixinTokenService {
    private token = null
    
    async getToken() {
        if (!this.token) {
            this.token = await someHttpResquest()
        }
        return this.token
    }
}
复制代码

这段代码具体在什么时候会出bug呢?假设遇到一种这样的顺序

  • 第一个请求到达,调用await someHttpRequest()之后交出控制权
  • 第二个请求到达,在判断!this.token时为真,也开始请求await someHttpRequest()
  • 之后哪一个请求后返回结果,this.token的值就是谁,但这个先后顺序是无法保证的,同时两次调用http接口这个行为也不是我们想要的

我们希望的应该是:当第一个接口请求的时候,另外一个接口就等着,等到第一个接口请求结束了,第二个接口就可以直接用第一个接口的结果

那我,我们第一反应就可以写出这样的代码

class WeixinTokenService {
    private token = null
    private isRequest = false
    private peddingCallback = []
    
    getToken(callback) {
        if (this.token) {
            // 已经有token了
            return callback(this.token)
        }
        this.peddingCallback.push(callback)
        if (this.isRequest) {
            return
        }
        this.isRequest = true
        someHttpResquest().then(token => {
            this.token = token
            this.paddingCallback.forEach(call => call())
            this.paddingCallback = []
            this.isRequest = false
        })
    }
}
复制代码

这么写下来,我们发现了几个问题

  • 代码逻辑很繁琐
  • 无法使用Promise了,外层的async函数也会受到影响,当然我们也可以在外面再套一层return new Promise,但总体来说这个解决方案很差

让我们回忆一下如果是在Java这种语言中,我们如何解决这个问题(为了引出后续的话题,我们使用了Lock来解决这个问题而非此处更适合的synchronized关键字)

public class WeixinTokenService {
    private String token = null
    private Lock lock = new Lock()
    
    public String getToken() {
        if (this.token != null) {
            return this.token
        }
        this.lock.lock()
        // 思考一下这里为什么会重复一句
        if (this.token != null) {
            return this.token
        }
        try {
            this.token = someHttpResquest()
        } finally {
            this.lock.unlock()
        }
        return this.token
    }
}
复制代码

是的,java直接用一个进程锁在开始http请求之前锁住,这样第一个请求将this.token赋值之后再释放出锁,等其他的线程再重新拿到锁的时候,就可以直接用之前的结果(见注释处)

这给了我们一个灵感,我们也希望我们能在某一个把代码锁住,等其他的请求完成之后,再继续进行,但是很遗憾,node是没有阻塞这个概念的(Java的Lock是基于阻塞,即将一个线程停止掉,知道某一个事件发生之后重新恢复),而node只有一根线程,你要是这个这个线程阻塞住了(比如while (true) {}),整个应用也基本挂了。

然后我们有另外一个武器,他就是Promise,我们可以这么思考,将

this.lock.lock()
//替换为
await this.lock.lock()
复制代码

基于这个思路,你能否写出一个Lock类来?

下面是我们的实现(下面的实现可以同时锁住多个key,同时为了说清楚中间变量,我们用到了typescript)


interface ResolveRejectStorage {
    key: string;
    resolve: (data: Unlocker) => void;
    reject: (err: any) => void;
}

class Unlocker {
    constructor(private lock: LockService, private key: string) {}

    release() {
        this.lock.releaseKeys(this.key);
        this.key = null;
    }
}

export class LockService {
    // 当前正在运行的任务的key集合
    private keyList = new Set<string>();
    private waiting: ResolveRejectStorage[] = [];

    lock(key: string): Promise<Unlocker> {
        // 返回这个promise给外层的代码等待
        return new Promise<Unlocker>((resolve, reject) => {
            this.waiting.push({
                resolve,
                reject,
                key,
            });
            this.lockReally();
        });
    }

    releaseKeys(key: string) {
        this.keyList.delete(key);
        // 尝试其他任务
        this.lockReally();
    }

    private lockReally() {
        for (let task of this.waiting) {
            // 如果keyList中任意一个keykeys包含,就认为有冲突 需要等待 否则可以运行
            if (!this.keyList.has(task.key)) {
                // 加入keyList
                this.keyList.add(task.key);
                // 在waitingremovelet index = this.waiting.indexOf(task);
                this.waiting.splice(index, 1);

                // 最终的外层返回值
                let unlock = new Unlocker(this, task.key);
                task.resolve(unlock);

                // 如果10s之后还没有release自动unlock(程序可能出错了)
                setTimeout(() => {
                    unlock.release();
                }, 10 * 1000);
            }
        }
    }
}



复制代码

基于LockService,我们的代码会变成这样

class WeixinTokenService {
    private token = null
    private lockService = new LockService()
    
    async getToken() {
        let unlocker = await this.lockService.lock('fetch-wx-token')
        try {
            if (this.token) {
                return this.token
            }
            this.token = await someHttpResquest()
            return this.token
        } finally {
            unlocker.release()
        }
        
    }
}
复制代码

或者我们可以给LockService封装一个统一的方法

export class LockService {
    async runInLock<T>(key: string, fn: () => T | Promise<T>) {
        const unlocker = await this.lock(key)
        try {
            return await fn()
        } finally {
            unlocker.release()
        }
    }
}
复制代码

那么我们的调用代码会变成这样

class WeixinTokenService {
    private token = null
    private lockService = new LockService()
    
    async getToken() {
        return this.lockService.runInLock('fetch-wx-token', async () => {
            if (this.token) {
                return this.token
            }
            this.token = await someHttpResquest()
            return this.token
        })
    }
}
复制代码

到现在为止,我们已经解决了单进程下的代码执行控制问题,我们只需要进行一个很小的推广就能适用于多进程的情况,我们下期再见。

欢迎加入Node开发高级进阶群,群主有时间就会给大家解决一些Node实战中会遇到的各种问题