Golang源码阅读 调度器部分之一:M,P,G的结构及调度器初始化
M,P,G的结构及调度器初始化
M,P,G及调度器schedt的结构
代码位置runtime/runtime2.go,我这里只将m中与p和G及调度相关的部分列出来
M的结构
type m struct{
g0 *g //主os线程
curg *g //当前正运行的G
spinning bool //m当前没有运行 work且正处于寻找work的活跃状态
p puintptr //绑定的p
}
P的结构
type p struct{
m muintptr //p反向链接的m
// 可运行的G队列
runqhead uint32
runqtail uint32
runq [256]guintptr
gFree struct { //结束了的g,复用
gList
n int32
}
}
G的结构
type g struct{
stack stack //goroutine的栈内存
......
m *m //当前的m
.......
waiting *sudog //当前G等待其他G的队列,按照等待的序列
}
schedt的结构
type schedt struct {
// m相关
midle muintptr // 空闲m链表
nmidle int32 // 空闲m的数量
nmidlelocked int32 // 锁住的空闲m的数量
mnext int64 // 已经创建的m的数量,下一个创建的m的id
maxmcount int32 // 允许的最多m的数量
// p相关
pidle puintptr // 空闲p链表
npidle uint32 // 空闲p数量
// G相关
// G的id
goidgen uint64
// 全局可运行G队列
runq gQueue
runqsize int32
// 死亡dead状态的G缓存
gFree struct {
lock mutex
stack gList // 包含栈的 Gs
noStack gList // 没有栈的 Gs
n int32
}
// 阻塞G的集中缓存
sudoglock mutex
sudogcache *sudog
}
调度器初始化
M/P/G彼此的初始化顺序遵循:mcommoninit –> procresize –> newproc。代码位置runtime/proc.go
M初始化
M是OS线程,有两个状态:自旋,非自旋。
如果一个工作线程的本地队列、全局运行队列或 netpoller中均没有工作,则该线程成为自旋线程。
自旋一段时间后,从per-P中运行队列中寻找工作。如果一个自旋进程发现工作,就会将自身切换出自旋状态,并且开始执行。如果它没有发现工作则会将自己进行暂止。
调度器初始化时,只有主线程m,mcommoninit对主线程m进行了初始化,并加入了全局allm
func mcommoninit(mp *m) {
......
lock(&sched.lock)
// 初始化了一下id,增加调度器中m数量,并检查m数量
mp.id = sched.mnext
sched.mnext++
checkmcount()
......
// 加入allm
mp.alllink = allm
......
unlock(&sched.lock)
......
}
P初始化
P有以下5个状态:
- _Pidle
- _Prunning
- _Psyscall
- _Pgcstop
- _Pdead
当程序刚开始运行进行初始化时,所有的P都处于_Pgcstop状态,随着P的初始化(runtime.procresize),会被置于_Pidle。
当M需要运行时,会runtime.acquirep,并通过 runtime.releasep来释放。
当G执行时需要进入系统调用时,P会被设置为_Psyscall,如果这个时候被系统监控抢夺(runtime.retake),则P会被重新修改为_Pidle.如果在程序运行中发生GC,则P会被设置为_Pgcstop,并在runtime.startTheWorld 时重新调整为_Pidle或者_Prunning
p的初始化主要发生了以下几个事情:
1. 记录修改p的时间到调度器
2. 调整默认allp(默认为cpu核数)等于nprocs(我们通过runtime.GOMAXPROCS()函数设置的p)
3. 初始化一下allp中的p
4. 修剪p
5. 如果当前p没被修剪,继续运行;否则,将当前p换为allp[0]
6. 遍历p,没有任务的放到调度器sched的空闲链表pidle。有任务的p获取一个m绑定,并串成一个链表返回。
G及主goroutine的初始化
当调度器初始化之后,主goroutine开始被调度器调度。由newproc()来完成主 goroutine的初始化工作。
G的状态如有以下部分:
- _Gidle
- _Grunnable
- _Grunning
- _Gsyscall
- _Gwaiting
- _Gdead
- _Gcopystack
- _Gscan
- _Gscanrunnable
- _Gscanrunning
- _Gscansyscall
- _Gscanwaiting
在newproc()函数中主要获取了要创建的函数地址,参数起始地址,参数长度,g0,调用方地址。调用了newproc1函数。
newproc1(fn, (*uint8)(argp), siz, gp, pc)
下面对newproc1函数的工作做一下说明:
1. 获取调用goutine的p,并从gfree列表(包括p上的及调度器的gfree)获取g以复用g。实际上获取不到,所以创建了一个g,并添加到了allg中。
2. g的初始创建状态为_Gdead,当有参数时,将参数拷贝到goroutine的执行栈。
3. 初始化以下g,将G的初始地址定义好,然后将g的状态只为_Grunnable,分配一个goid。
4. 将新创建的g放入p的本地队列或全局g队列。
5. 如果有空闲的p,且自旋的m为0,且主函数已运行,那么唤醒p。
具体代码如下:
func newproc1(fn *funcval, argp *uint8, narg int32, callergp *g, callerpc uintptr) {
// 从p或全局队列获取gfree
newg := gfget(_p_)
if newg == nil {
newg = malg(_StackMin)
casgstatus(newg, _Gidle, _Gdead)
allgadd(newg) // 将g放入allg
}
// 判断是否有参数,如果有将参数拷贝到执行栈
if narg > 0 {
memmove(unsafe.Pointer(spArg), unsafe.Pointer(argp), uintptr(narg))
}
// 定义好goroutine的初始指令地址,即函数地址
newg.startpc = fn.fn
// 修改状态为_Grunnable
casgstatus(newg, _Gdead, _Grunnable)
// 分配并定义goroutine的id,goid
if _p_.goidcache == _p_.goidcacheend {
_p_.goidcache = atomic.Xadd64(&sched.goidgen, _GoidCacheBatch)
_p_.goidcache -= _GoidCacheBatch - 1
_p_.goidcacheend = _p_.goidcache + _GoidCacheBatch
}
newg.goid = int64(_p_.goidcache)
// 将这里新创建的g放入p的本地队列或直接放入全局队列
// true表示放入执行队列的下一个,false表示放入队尾
runqput(_p_, newg, true)
}
下面对创建goroutine的newproc1这个函数中的几个终点部分分析一下
首先,是从gfree中获取g,从而复用的gfget函数。
该函数重点做了以下工作:
如果p中gfree不为空,那么尝试从p的gfree中获取,否则从调度器schedt的gfree获取。
p及schedt中的gfree都分为两种,一种是有栈的,另一个没有栈,在获取时更优先获取有栈的。
func gfget(_p_ *p) *g {
retry:
if _p_.gFree.empty() && (!sched.gFree.stack.empty() || !sched.gFree.noStack.empty()) {
lock(&sched.gFree.lock)
// Move a batch of free Gs to the P.
for _p_.gFree.n < 32 {
// Prefer Gs with stacks.
gp := sched.gFree.stack.pop()
if gp == nil {
gp = sched.gFree.noStack.pop()
if gp == nil {
break
}
}
sched.gFree.n--
_p_.gFree.push(gp)
_p_.gFree.n++
}
unlock(&sched.gFree.lock)
goto retry
}
gp := _p_.gFree.pop()
if gp == nil {
return nil
}
_p_.gFree.n--
if gp.stack.lo == 0 {
// Stack was deallocated in gfput. Allocate a new one.
systemstack(func() {
gp.stack = stackalloc(_FixedStack)
})
gp.stackguard0 = gp.stack.lo + _StackGuard
} else {
if raceenabled {
racemalloc(unsafe.Pointer(gp.stack.lo), gp.stack.hi-gp.stack.lo)
}
if msanenabled {
msanmalloc(unsafe.Pointer(gp.stack.lo), gp.stack.hi-gp.stack.lo)
}
}
return gp
}
然后,是将g放入p或者全局队列的函数runqput。
主要发生了以下工作:
1. 将g放入p的队列中。假如p中g队列runq已满,那么放入全局队列即调度器schedt的runq中
2. 在放入全局队列时,会从p队列中拿取一半的goroutine一起放入全局队列
// 判断放入p中还是放入全局队列
func runqput(_p_ *p, gp *g, next bool) {
......
h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with consumers
t := _p_.runqtail
if t-h < uint32(len(_p_.runq)) {
_p_.runq[t%uint32(len(_p_.runq))].set(gp)
atomic.StoreRel(&_p_.runqtail, t+1) // store-release, makes the item available for consumption
return
}
if runqputslow(_p_, gp, h, t) {
return
}
}
// 从p的G数组runq中拿取一半goroutine,并将最开始的g也放入batch[n]中
func runqputslow(_p_ *p, gp *g, h, t uint32) bool {
ar batch [len(_p_.runq)/2 + 1]*g
// First, grab a batch from local queue.
n := t - h
n = n / 2
if n != uint32(len(_p_.runq)/2) {
throw("runqputslow: queue is not full")
}
for i := uint32(0); i < n; i++ {
batch[i] = _p_.runq[(h+i)%uint32(len(_p_.runq))].ptr()
}
if !atomic.CasRel(&_p_.runqhead, h, h+n) { // cas-release, commits consume
return false
}
batch[n] = gp
......
globrunqputbatch(&q, int32(n+1))
.....
}
// 批量放入全局队列
func globrunqputbatch(batch *gQueue, n int32) {
sched.runq.pushBackAll(*batch)
sched.runqsize += n
*batch = gQueue{}
}