M,P,G的结构及调度器初始化

M,P,G及调度器schedt的结构

代码位置runtime/runtime2.go,我这里只将m中与p和G及调度相关的部分列出来

M的结构

type m struct{
    g0            *g  //主os线程 
    curg          *g  //当前正运行的G
    spinning      bool //m当前没有运行 work且正处于寻找work的活跃状态
    p             puintptr  //绑定的p
}

P的结构

type p struct{
    m           muintptr  //p反向链接的m

// 可运行的G队列
    runqhead uint32
    runqtail uint32
    runq     [256]guintptr

    gFree struct {       //结束了的g,复用
	    gList
	    n int32
    }

}

G的结构

type g struct{
    stack stack           //goroutine的栈内存
......
    m              *m    //当前的m
.......
    waiting        *sudog  //当前G等待其他G的队列,按照等待的序列      
}

schedt的结构

type schedt struct {
// m相关
    midle        muintptr // 空闲m链表
    nmidle       int32    // 空闲m的数量
    nmidlelocked int32    // 锁住的空闲m的数量
    mnext        int64    // 已经创建的m的数量,下一个创建的m的id
    maxmcount    int32    // 允许的最多m的数量

// p相关
    pidle      puintptr // 空闲p链表
    npidle     uint32 // 空闲p数量

// G相关
// G的id
    goidgen  uint64
// 全局可运行G队列
    runq     gQueue
    runqsize int32
// 死亡dead状态的G缓存
    gFree struct {
		lock    mutex
		stack   gList // 包含栈的 Gs
		noStack gList // 没有栈的 Gs
		n       int32
    }
// 阻塞G的集中缓存
    sudoglock  mutex
    sudogcache *sudog
}

调度器初始化

M/P/G彼此的初始化顺序遵循:mcommoninit –> procresize –> newproc。代码位置runtime/proc.go

M初始化

M是OS线程,有两个状态:自旋,非自旋。

如果一个工作线程的本地队列、全局运行队列或 netpoller中均没有工作,则该线程成为自旋线程。

自旋一段时间后,从per-P中运行队列中寻找工作。如果一个自旋进程发现工作,就会将自身切换出自旋状态,并且开始执行。如果它没有发现工作则会将自己进行暂止。

调度器初始化时,只有主线程m,mcommoninit对主线程m进行了初始化,并加入了全局allm

func mcommoninit(mp *m) {
......
    lock(&sched.lock)
// 初始化了一下id,增加调度器中m数量,并检查m数量
    mp.id = sched.mnext
    sched.mnext++
    checkmcount()
......
// 加入allm
    mp.alllink = allm
......
    unlock(&sched.lock)
......
}

P初始化

P有以下5个状态:

  • _Pidle
  • _Prunning
  • _Psyscall
  • _Pgcstop
  • _Pdead

当程序刚开始运行进行初始化时,所有的P都处于_Pgcstop状态,随着P的初始化(runtime.procresize),会被置于_Pidle。
当M需要运行时,会runtime.acquirep,并通过 runtime.releasep来释放。
当G执行时需要进入系统调用时,P会被设置为_Psyscall,如果这个时候被系统监控抢夺(runtime.retake),则P会被重新修改为_Pidle.如果在程序运行中发生GC,则P会被设置为_Pgcstop,并在runtime.startTheWorld 时重新调整为_Pidle或者_Prunning

p的初始化主要发生了以下几个事情:
1. 记录修改p的时间到调度器
2. 调整默认allp(默认为cpu核数)等于nprocs(我们通过runtime.GOMAXPROCS()函数设置的p)
3. 初始化一下allp中的p
4. 修剪p
5. 如果当前p没被修剪,继续运行;否则,将当前p换为allp[0]
6. 遍历p,没有任务的放到调度器sched的空闲链表pidle。有任务的p获取一个m绑定,并串成一个链表返回。

G及主goroutine的初始化

当调度器初始化之后,主goroutine开始被调度器调度。由newproc()来完成主 goroutine的初始化工作。

G的状态如有以下部分:
- _Gidle
- _Grunnable
- _Grunning
- _Gsyscall
- _Gwaiting
- _Gdead
- _Gcopystack
- _Gscan
- _Gscanrunnable
- _Gscanrunning
- _Gscansyscall
- _Gscanwaiting

在newproc()函数中主要获取了要创建的函数地址,参数起始地址,参数长度,g0,调用方地址。调用了newproc1函数。

newproc1(fn, (*uint8)(argp), siz, gp, pc)

下面对newproc1函数的工作做一下说明:
1. 获取调用goutine的p,并从gfree列表(包括p上的及调度器的gfree)获取g以复用g。实际上获取不到,所以创建了一个g,并添加到了allg中。
2. g的初始创建状态为_Gdead,当有参数时,将参数拷贝到goroutine的执行栈。
3. 初始化以下g,将G的初始地址定义好,然后将g的状态只为_Grunnable,分配一个goid。
4. 将新创建的g放入p的本地队列或全局g队列。
5. 如果有空闲的p,且自旋的m为0,且主函数已运行,那么唤醒p。

具体代码如下:

func newproc1(fn *funcval, argp *uint8, narg int32, callergp *g, callerpc uintptr) {
// 从p或全局队列获取gfree
    newg := gfget(_p_)
    if newg == nil {
	    newg = malg(_StackMin)
	    casgstatus(newg, _Gidle, _Gdead)
	    allgadd(newg) // 将g放入allg
    }
// 判断是否有参数,如果有将参数拷贝到执行栈
    if narg > 0 {		              
    memmove(unsafe.Pointer(spArg),      unsafe.Pointer(argp), uintptr(narg))
    }
// 定义好goroutine的初始指令地址,即函数地址
    newg.startpc = fn.fn 
// 修改状态为_Grunnable
    casgstatus(newg, _Gdead, _Grunnable)
// 分配并定义goroutine的id,goid
    if _p_.goidcache == _p_.goidcacheend {
	_p_.goidcache = atomic.Xadd64(&sched.goidgen, _GoidCacheBatch)
	_p_.goidcache -= _GoidCacheBatch - 1
	_p_.goidcacheend = _p_.goidcache + _GoidCacheBatch
    }
    newg.goid = int64(_p_.goidcache)
// 将这里新创建的g放入p的本地队列或直接放入全局队列
// true表示放入执行队列的下一个,false表示放入队尾
    runqput(_p_, newg, true)
}

下面对创建goroutine的newproc1这个函数中的几个终点部分分析一下

首先,是从gfree中获取g,从而复用的gfget函数。
该函数重点做了以下工作: 如果p中gfree不为空,那么尝试从p的gfree中获取,否则从调度器schedt的gfree获取。
p及schedt中的gfree都分为两种,一种是有栈的,另一个没有栈,在获取时更优先获取有栈的。

func gfget(_p_ *p) *g {
retry:
	if _p_.gFree.empty() && (!sched.gFree.stack.empty() || !sched.gFree.noStack.empty()) {
		lock(&sched.gFree.lock)
		// Move a batch of free Gs to the P.
		for _p_.gFree.n < 32 {
			// Prefer Gs with stacks.
			gp := sched.gFree.stack.pop()
			if gp == nil {
				gp = sched.gFree.noStack.pop()
				if gp == nil {
					break
				}
			}
			sched.gFree.n--
			_p_.gFree.push(gp)
			_p_.gFree.n++
		}
		unlock(&sched.gFree.lock)
		goto retry
	}
	gp := _p_.gFree.pop()
	if gp == nil {
		return nil
	}
	_p_.gFree.n--
	if gp.stack.lo == 0 {
		// Stack was deallocated in gfput. Allocate a new one.
		systemstack(func() {
			gp.stack = stackalloc(_FixedStack)
		})
		gp.stackguard0 = gp.stack.lo + _StackGuard
	} else {
		if raceenabled {
			racemalloc(unsafe.Pointer(gp.stack.lo), gp.stack.hi-gp.stack.lo)
		}
		if msanenabled {
			msanmalloc(unsafe.Pointer(gp.stack.lo), gp.stack.hi-gp.stack.lo)
		}
	}
	return gp
}

然后,是将g放入p或者全局队列的函数runqput。
主要发生了以下工作:
1. 将g放入p的队列中。假如p中g队列runq已满,那么放入全局队列即调度器schedt的runq中
2. 在放入全局队列时,会从p队列中拿取一半的goroutine一起放入全局队列

// 判断放入p中还是放入全局队列
func runqput(_p_ *p, gp *g, next bool) {
......
	h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with consumers
	t := _p_.runqtail
	if t-h < uint32(len(_p_.runq)) {
		_p_.runq[t%uint32(len(_p_.runq))].set(gp)
		atomic.StoreRel(&_p_.runqtail, t+1) // store-release, makes the item available for consumption
		return
	}
	if runqputslow(_p_, gp, h, t) {
		return
	}
}

// 从p的G数组runq中拿取一半goroutine,并将最开始的g也放入batch[n]中
func runqputslow(_p_ *p, gp *g, h, t uint32) bool {
ar batch [len(_p_.runq)/2 + 1]*g

	// First, grab a batch from local queue.
	n := t - h
	n = n / 2
	if n != uint32(len(_p_.runq)/2) {
		throw("runqputslow: queue is not full")
	}
	for i := uint32(0); i < n; i++ {
		batch[i] = _p_.runq[(h+i)%uint32(len(_p_.runq))].ptr()
	}
	if !atomic.CasRel(&_p_.runqhead, h, h+n) { // cas-release, commits consume
		return false
	}
	batch[n] = gp
......
globrunqputbatch(&q, int32(n+1))
.....
}

// 批量放入全局队列
func globrunqputbatch(batch *gQueue, n int32) {
	sched.runq.pushBackAll(*batch)
	sched.runqsize += n
	*batch = gQueue{}
}