Golang源码阅读调度器部分之二:调度循环
Golang源码阅读调度器部分之二:调度循环
当调度器初始化完成之后,就进入了调度循环。
调度循环
调度循环过程主要由mstart完成,而mstart主要调用了mstart1这个函数:
func mstart() {
......
mstart1()
......
}
而mstart1则主要调用了schdule这个函数,且无返回。
对于m0来说,无需获取p;对于后边的m,需要获取p绑定。
......
schedule()
下面进入调度循环最重要的部分schedule()函数,该函数也是不返回的。schedule函数的主要作用是:
1. 从实体调度器结构schedt全局g获取g;
2. 如果没找到,从本地p获取一个g;
3. 如果还没找到,进行偷取工作。偷取工作有以下步骤:
1. 首先从本地队列拿取;
2. 如果还没找到,从全局队列偷取;
3. 如果依旧没找到,从poll网络检查g,有的话直接返回;
4. 最后仍旧没取到,那么从其他p这个中偷取,总共进行4次,如果有取到,跳出循环,直接返回。
5. 如果仍旧没取到,那么准备归还当前p,即将当前p状态置为idle。这个动作先对当前所有p进行快照;然后再检查schedt的全局g队列是否有g可以运行,有就直接返回,没有就将当前运行的p释放,并放入schedt的idlep队列。然后将m从自旋变为非自旋。
6. 最后再检查一遍runing状态p队列是否有可运行的g,有的话再重新获取一个p,并将p绑定到m,并且如果m已经变为非自旋,那么修改m为可运行任务的自选状态。
7. 最后真的什么都没获取到,暂止当前m。
- 最后我们假定获取到了g,那么进行execute函数。execute将g从runable状态置为runing状态,然后执行gogo()函数;gogo函数调用完之后,通过goexit函数来完成善后及重新调度工作。
以下是schedule函数:
func schedule() {
var gp *g
var inheritTime bool
// 从全局调取器的g队列偷取
if gp == nil {
// Check the global runnable queue once in a while to ensure fairness.
// Otherwise two goroutines can completely occupy the local runqueue
// by constantly respawning each other.
if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
lock(&sched.lock)
gp = globrunqget(_g_.m.p.ptr(), 1)
unlock(&sched.lock)
}
}
// 从本地p的g队列拿取
if gp == nil {
gp, inheritTime = runqget(_g_.m.p.ptr())
if gp != nil && _g_.m.spinning {
throw("schedule: spinning with local work")
}
}
//从其他p队列偷取g
if gp == nil {
gp, inheritTime = findrunnable() // blocks until work is available
}
execute(gp, inheritTime)
}
从全局调度器g队列偷取的代码:
func globrunqget(_p_ *p, max int32) *g {
if sched.runqsize == 0 {
return nil
}
// 取调度器g数量/p的数量+1
n := sched.runqsize/gomaxprocs + 1
if n > sched.runqsize {
n = sched.runqsize
}
if max > 0 && n > max {
n = max
}
// 如果偷取的g数量大于本地p的可运行g数量的1/2,置为1/2
if n > int32(len(_p_.runq))/2 {
n = int32(len(_p_.runq)) / 2
}
sched.runqsize -= n
gp := sched.runq.pop()
n--
for ; n > 0; n-- {
gp1 := sched.runq.pop()
runqput(_p_, gp1, false)
}
return gp
}
总体偷取函数findrunnable代码如下:
func findrunnable() (gp *g, inheritTime bool) {
// 保存快照
allpSnapshot := allp
// 从本地取
if gp, inheritTime := runqget(_p_); gp != nil {
return gp, inheritTime
}
// 全局偷取
if sched.runqsize != 0 {
lock(&sched.lock)
gp := globrunqget(_p_, 0)
unlock(&sched.lock)
if gp != nil {
return gp, false
}
}
// 从其他p取,4次尝试
procs := uint32(gomaxprocs)
for i := 0; i < 4; i++ {
for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() {
if sched.gcwaiting != 0 {
goto top
}
stealRunNextG := i > 2 // first look for ready queues with more than 1 g
if gp := runqsteal(_p_, allp[enum.position()], stealRunNextG); gp != nil {
return gp, false
}
}
}
// 准备停止p,再检查一遍p的可运行g数量
allpSnapshot := allp
lock(&sched.lock)
if sched.runqsize != 0 {
gp := globrunqget(_p_, 0)
unlock(&sched.lock)
return gp, false
}
if releasep() != _p_ {
throw("findrunnable: wrong p")
}
pidleput(_p_)
unlock(&sched.lock)
// 再检查一遍所有p的运行队列
for _, _p_ := range allpSnapshot {
if !runqempty(_p_) {
lock(&sched.lock)
_p_ = pidleget()
unlock(&sched.lock)
if _p_ != nil {
acquirep(_p_)
if wasSpinning {
_g_.m.spinning = true
atomic.Xadd(&sched.nmspinning, 1)
}
goto top
}
break
}
}
// 停止m
stopm()
}