深入理解GO语言之并发机制

4,546 阅读6分钟

前言:可以说GO真正吸引到我的就是并发这块了,深入理解这个机制后让我收益匪浅,接下来就用自己薄弱的认知来谈谈GO的并发机制。

一,初始化过程

在这之前,先看下asm_arm64.s中的汇编代码关于启动这块的逻辑

CALL    runtime·args(SB)
CALL    runtime·osinit(SB)
CALL    runtime·hashinit(SB)
CALL    runtime·schedinit(SB)

// create a new goroutine to start program
PUSHQ    $runtime·main·f(SB)        // entry
PUSHQ    $0            // arg size
CALL    runtime·newproc(SB)
POPQ    AX
POPQ    AX

// start this M
CALL    runtime·mstart(SB)

接下来就进入分析环节

1,通过osinit函数还获取cpu个数和page的大小,这块挺简单的
2,接下来看看schedinit函数(跟本节相关的重要代码)

func schedinit() {
    //获取当前的G
    _g_ := getg()
    if raceenabled {
        _g_.racectx, raceprocctx0 = raceinit()
    }
    //设置M的最大数量
    sched.maxmcount = 10000
    //初始化栈空间
    stackinit()
    //内存空间初始化操作
    mallocinit()
    //初始化当前的M
    mcommoninit(_g_.m)

    //将P的数量调整为CPU数量
    procs := ncpu
    if n, ok := atoi32(gogetenv("GOMAXPROCS")); ok && n > 0 {
        procs = n
    }
    if procs > _MaxGomaxprocs {
        procs = _MaxGomaxprocs
    }
    //初始化P
    if procresize(procs) != nil {
        throw("unknown runnable goroutine during bootstrap")
    }

}

3,上面我们可以看到调用了procresize函数来初始化P,那么我们来看下procresize函数。这块代码过长,分几个部分解析(只贴重要的代码)
(1) 初始化新的P

for i := int32(0); i < nprocs; i++ {
        pp := allp[i]
        if pp == nil {
            //新建一个P对象
            pp = new(p)
            pp.id = i
            pp.status = _Pgcstop
            //保存到allp数组(负责存储P的数组)
            atomicstorep(unsafe.Pointer(&allp[i]), unsafe.Pointer(pp))
        }
        //如果P还没有cache,那么进行分配
        if pp.mcache == nil {
            if old == 0 && i == 0 {
                if getg().m.mcache == nil {
                    throw("missing mcache?")
                }
                pp.mcache = getg().m.mcache // bootstrap
            } else {
                pp.mcache = allocmcache()//分配cache
            }
        }
    }

(2) 释放没被使用的P

for i := nprocs; i < old; i++ {
        p := allp[i]
        // 将本地任务添加到全局队列中
        for p.runqhead != p.runqtail {
            p.runqtail--
            gp := p.runq[p.runqtail%uint32(len(p.runq))].ptr()
            // 插入全局队列的头部
            globrunqputhead(gp)
        }
        //释放P所绑定的cache
        freemcache(p.mcache)
        p.mcache = nil
        //将当前的P的G复用链接到全局
        gfpurge(p)
        p.status = _Pdead
        // can't free P itself because it can be referenced by an M in syscall
    }

经过这两个步骤后,那么我们就创建了一批的P,闲置的P会被放进调度器Sched的空闲链表中

二,创建G的过程

从上面的汇编代码可以看出接下来会去调用newproc函数来创建主G,然后用这个主函数去执行runtime.main,然后创建一个线程(这个线程在运行期间专门负责系统监控),接下来就进入GO程序中的main函数去运行了。
先看下newproc代码

func newproc(siz int32, fn *funcval) {
    argp := add(unsafe.Pointer(&fn), sys.PtrSize)//获取参数的地址
    pc := getcallerpc(unsafe.Pointer(&siz))//获取调用方的PC支
    systemstack(func() {
        newproc1(fn, (*uint8)(argp), siz, 0, pc)//真正创建G的地方
    })
}

接下来看下newpro1的主要代码

func newproc1(fn *funcval, argp *uint8, narg int32, nret int32, callerpc uintptr) *g {
    //从当前P复用链表来获取G
    _p_ := _g_.m.p.ptr()
    newg := gfget(_p_)
    //如果获取失败,则新建一个
    if newg == nil {
        newg = malg(_StackMin)
        casgstatus(newg, _Gidle, _Gdead)
        allgadd(newg) 
    }
    //将得到的G放入P的运行队列中
    runqput(_p_, newg, true)
    //下面三个条件分别为:是否有空闲的P;M是否处于自旋状态;当前是否创建runteime.main
    if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 && runtimeInitTime != 0 {
        wakep()
    }

}

这个wakep()函数的代码也是值得一看的,这个思想可以用到平时的代码编程中去

func wakep() {
    //线程被唤醒后需要绑定一个P,这里使用cas操作,可以避免唤醒过多线程,这里也对应了上面的三个判断条件之一
    if !atomic.Cas(&sched.nmspinning, 0, 1) {
        return
    }
    startm(nil, true)
}

startm的代码就留给读者自己去看了,不然感觉整个博文都是代码,主要的思想是:获取一个空闲的P(如果传入的P为空),然后先尝试获取空闲M(空闲的M被调度器schedt管理,这个结构体也可以去看下),获取不到再去创建一个M等。

三,Channel

这块就稍微比较简单了,代码也不多,但是看下来收获还是很多的

1,创建Channel

先看下结构体定义(有删减)

type hchan struct {
    qcount   uint           // 队列中数据个数
    dataqsiz uint           // 缓冲槽大小
    buf      unsafe.Pointer // 指向缓冲槽的指针
    elemsize uint16         // 数据大小
    closed   uint32         // 表示 channel 是否关闭
    elemtype *_type // 数据类型
    sendx    uint   // 发送位置索引
    recvx    uint   // 接收位置索引
    recvq    waitq  // 接收等待列表
    sendq    waitq  // 发送等待列表
    lock mutex      // 锁
}
type sudog struct {
    g          *g
    selectdone *uint32 // CAS to 1 to win select race (may point to stack)
    next       *sudog
    prev       *sudog
    elem       unsafe.Pointer // data element (may point to stack)
    waitlink    *sudog // g.waiting list or semaRoot
    waittail    *sudog // semaRoot
    c           *hchan // channel
}

上面的recvq其实是读操作阻塞在channel的G列表,sendq其实是写操作阻塞在channel的G列表,那么G可以同时阻塞在不同的channel上,那么如何解决呢?这时候就引入了sudog,它其实是对G的一个包装,代表在等待队列上的一个G。

接下来看看创建过程

func makechan(t *chantype, size int64) *hchan {
    elem := t.elem

    // 大小不超过64K
    if elem.size >= 1<<16 {
        throw("makechan: invalid channel element type")
    }
    var c *hchan
    // 整个创建过程还是简单明了的
    if elem.kind&kindNoPointers != 0 || size == 0 {
        //一次性分配内存
        c = (*hchan)(mallocgc(hchanSize+uintptr(size)*elem.size, nil, true))
        if size > 0 && elem.size != 0 {
            c.buf = add(unsafe.Pointer(c), hchanSize)
        } else {
            c.buf = unsafe.Pointer(c)
        }
    } else {
        c = new(hchan)
        c.buf = newarray(elem, int(size))
    }
    //设置数据大小,类型和缓冲槽大小
    c.elemsize = uint16(elem.size)
    c.elemtype = elem
    c.dataqsiz = uint(size)

    return c
}

2,发送

send函数的代码有点长,接下来就拆分进行说明
(1) 如果recvq有G在阻塞,那么就从该队列取出该G,将数据给该G

if sg := c.recvq.dequeue(); sg != nil {
        // Found a waiting receiver. We pass the value we want to send
        // directly to the receiver, bypassing the channel buffer (if any).
        send(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true
    }

(2) 如果hchan.buf还有可用的空间,那么就将数据放入

//通过比较qcount和datasiz来判断是否还有可用空间
if c.qcount < c.dataqsiz {
        // 将数据放入buf中
        qp := chanbuf(c, c.sendx)
        if raceenabled {
            raceacquire(qp)
            racerelease(qp)
        }
        typedmemmove(c.elemtype, qp, ep)
        c.sendx++
        if c.sendx == c.dataqsiz {
            c.sendx = 0
        }
        c.qcount++
        unlock(&c.lock)
        return true
    }

(3) hchan.buf满了,那么就会阻塞住了

// Block on the channel. Some receiver will complete our operation for us.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
    mysg.releasetime = -1
}
//初始化一些参数
mysg.elem = ep         
mysg.waitlink = nil
mysg.g = gp
mysg.selectdone = nil
mysg.c = c
gp.waiting = mysg
gp.param = nil
// 将当前 goroutine加入等待队列
c.sendq.enqueue(mysg)   
goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3)

这里我们就可以看到了,如果满了,那么sudog就会出现了,通过初始化后代表当前G进入等待队列

3,接收

同理,接收也分为三种情况

(1) 当前有发送goroutine阻塞在channel上,buf满了

if sg := c.sendq.dequeue(); sg != nil {
        recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true, true
    }

(2) buf中有数据

if c.qcount > 0 {
        // 直接从队列中接收
        qp := chanbuf(c, c.recvx)
        if ep != nil {
            typedmemmove(c.elemtype, ep, qp)
        }
        typedmemclr(c.elemtype, qp)
        c.recvx++
        if c.recvx == c.dataqsiz {
            c.recvx = 0
        }
        c.qcount--
        unlock(&c.lock)
        return true, true
    }

(3) buf中无数据了,那么则会阻塞住

    gp := getg()
    mysg := acquireSudog()
    mysg.releasetime = 0
    if t0 != 0 {
        mysg.releasetime = -1
    }
    // 同样的,由sudog代表G去排队
    mysg.elem = ep
    mysg.waitlink = nil
    gp.waiting = mysg
    mysg.g = gp
    mysg.selectdone = nil
    mysg.c = c
    gp.param = nil
    c.recvq.enqueue(mysg)
    goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv, 3)

总结:虽然这块代码逻辑不复杂,但是设计的东西很多,还是用了很多时间,现在对M执行G的逻辑是懂了,但是还不清楚细节,后面会继续研究。总的读下来,首先第一是对并发的机制可以说是很了解了,对以后在编写相关代码肯定很有帮助。第二,学习到了一些编程思想,例如cas操作,如何更好的进行封装和抽象等。