认知觉醒-学习Go语言协程的方法论

进程

001.进程

对于注册/注销一家公司,需要的手续和资料是很复杂的,那么这里我们把公司类比计算机中的进程。

线程

002.线程

每个公司都有各个部门,那么每个部门都可以共享公司的资源,如图中的会议室和茶水间。每个部门类似计算集中的线程。计算机中进程是开辟了一段内存空间,让程序在上面运行应用,而线程会占用一部分内存,线程之间的内存是共享的,他会占据cpu,这里可以假定cpu是老板。

003.线程做事

每个部门都要做事,最后都要老板来敲定最后的方案。那线程也一样,每个线程都少不了运行程序代码,假定我们只有一颗cpu,那么每个线程都需要这个cpu,cpu每次执行一小段时间,就要切换出去,执行下一个线程,当然,每个线程也要保持cpu的执行现场,以便下次再次执行到自己的时候,从哪里开始执行,就这样,最终全部线程执行完毕,看起来计算机好像同时在执行多个任务。所以,线程占用资源大,开销大,切换开销大。

协程

003.线程做事

同样还是这张图,老板只有1个,但是每次参加各个部门的会议,老板不再记录中间的会议纪要,由各个部门自己记录,在计算机中,我们用1个线程和1颗cpu,去让各个协程轮流执行,但是每个协程自己记录程序的运行现场,保持中间状态,这样线程就不必记录运行现场了,协程只要每次把自己的全部内容放到线程上执行,就可以了。协程也可以不依赖具体的哪个cpu而运行。

协程是如何调用的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package main

import "fmt"

func buy(c chan struct{}) {
fmt.Println("已买菜,进入下一步")
cook(c)
}
func cook(c chan struct{}) {
fmt.Println("已做好饭,进入下一步")
eat(c)
}
func eat(c chan struct{}) {
fmt.Println("已吃完饭,进入下一步")
wash(c)
}
func wash(c chan struct{}) {
fmt.Println("洗碗结束")
c <- struct{}{}
}
func main() {
c := make(chan struct{})
go buy(c)
<-c
}

打印输出
已买菜,进入下一步
已做好饭,进入下一步
已吃完饭,进入下一步
洗碗结束

004.协程栈

这里是Goland中调试信息,可以看到最终我们是通过一个通道发送1个空结构体,然后主协程再退出的。

对于这个runtime.goexit()我们不清楚,那看一下

asm_amd64.s

1
2
3
TEXT runtime·goexit(SB),NOSPLIT|TOPFRAME,$0-0
BYTE $0x90
CALL runtime·goexit1(SB)

runtime/proc.go

1
2
3
4
5
6
7
8
9
func goexit1() {
...
mcall(goexit0)
}

func goexit0(gp *g) {
...
schedule()
}

3行 切换到g0栈

8行 又切回到g0栈的schedule

下面看一下Gorotine长什么样

runtime/runtime2.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
type g struct {
stack stack
m *m
sched gobuf
atomicstatus uint32
}

type stack struct {
lo uintptr
hi uintptr
}

type gobuf struct {
sp uintptr
pc uintptr
g guintptr
ctxt unsafe.Pointer
ret uintptr
lr uintptr
bp uintptr
}

type m struct {
g0 *g
...
curg *g
}
  1. statck - 协程在内存中开辟一个区域,也就是栈,区域的开头低(lo)地址,结尾是高(high)地址

  2. sched - gobuf

    1. sp(stack pointer)栈指针 当前协程调用到哪个栈针,不是方法。
    2. pc (programing count) 调用到哪个栈方法中的哪一行
  3. atomicstatus 当前协程的状态是什么

  4. m Go语言中定义的线程(这个是gmp模型中的重要内容)

  5. g0 Go语言在刚刚启动时候调用的groutine,它是根协程。

  6. curg 当前运行的协程,也是当前线程中运行的协程

g0栈

下面到runtime/proc.go

schedule() 线程启动的调度方法,为了找到一个可用的协程(本地的或者全局的),并且执行它。

1
2
3
4
5

func schedule() {
...
execute(gp, inheritTime) //gp-可用的goroutine
}

调度gp(goroutine)在当前的线程上运行,如果inheritTime是真,gp将继承当前时间片,否则,它将启动一个新的时间片。

1
2
3
4
func execute(gp *g, inheritTime bool) {
...
gogo(&gp.sched)
}

asm_amd64.s

1
2
3
4
5
6
7
8
9
10
11
12
TEXT runtime·gogo(SB), NOSPLIT, $0-8
...
JMP gogo<>(SB)

TEXT gogo<>(SB), NOSPLIT, $0
...
MOVQ DX, R14
MOVQ gobuf_sp(BX), SP
...
MOVQ $0, gobuf_sp(BX)
MOVQ gobuf_pc(BX), BX
JMP BX
  • 7行 设置协程注册器
  • 8行 恢复sp(stack pointer),就是goland上面截图的goexit()
  • 10行 帮助GC清理
  • 12行 跳转到我们业务中的gorotine里的pc(程序计数器),也就是我们的业务代码里。

GMP模型

005.GMP模型

采用这个模型的好处是每次一个M,可以拿到一匹要执行的Goroutine任务,这样就避免了每次执行一个goroutine都要去全局队列里拿,造成并发问题,性能低。

下面看一下P长什么样?

1
2
3
4
5
6
7
8
9
10
type p struct {
m muintptr
...
runqhead uint32
runqtail uint32
runq [256]guintptr
runnext guintptr
}

type muintptr uintptr

2行 uint的指针

4-6行 可运行的goroutine队列。 可以无锁队列,因为他是本地队列,只有一个线程访问,所以不用加锁

4行 队列的头

5行 队列的尾

6行 256个的goroutine数组,也就是队列长度。

7行 下一个要执行的goroutine

006.P结构体

007.P结构体

可以看到P是M和G的中间商,它负责把本地队里运送给M去执行。当某个本地队列都全部执行完G,那么可以去申请全局锁,再去全局里拿一匹G,放到本地去执行。

Runtime/proc.go

1
2
3
4
func schedule() {
_g_ := getg()
gp, inheritTime, tryWakeP := findRunnable()
}

2行 当前执行的协程

3行 获得一个可运行的goroutine执行,可以从自己的本地队列中拿到,还可以从其他P里面偷到,还可以从全局队列里拿到。

新建协程会怎样?

runtime/proc.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func newproc(fn *funcval) {
gp := getg()
...
newg := newproc1(fn, gp, pc)
runqput(_p_, newg, true)
}

func runqput(_p_ *p, gp *g, next bool) {
...
oldnext := _p_.runnext
if !_p_.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))) {
goto retryNext
}
if oldnext == 0 {
return
}
gp = oldnext.ptr()
}

将新的G随机找一个P,然后让runnext指向这个新的G,如果本地队列都满了,那么就放到全局队列里。

协程饥饿

如果本地队列,要执行一个任务,3分钟才能完成,这个在计算机中是非常非常长的时间了,很多其他等待任务已经超时了,还有就是剩余本地队列的小任务(只需要执行50毫秒的)无法执行,导致了协程的饥饿。

解决方案:

  • gopark() 将当前 goroutine 置于等待状态
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) {
if reason != waitReasonSleep {
checkTimeouts() // timeouts may expire while two goroutines keep the scheduler busy
}
mp := acquirem()
gp := mp.curg
status := readgstatus(gp)
if status != _Grunning && status != _Gscanrunning {
throw("gopark: bad g status")
}
mp.waitlock = lock
mp.waitunlockf = unlockf
gp.waitreason = reason
mp.waittraceev = traceEv
mp.waittraceskip = traceskip
releasem(mp)
mcall(park_m)
}

2-16行 g和m设置lock相关

17行 无法执行任何可能操作,在 M 和G 之间。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func park_m(gp *g) {
_g_ := getg()
if trace.enabled {
traceGoPark(_g_.m.waittraceev, _g_.m.waittraceskip)
}

casgstatus(gp, _Grunning, _Gwaiting)
dropg()

if fn := _g_.m.waitunlockf; fn != nil {
ok := fn(gp, _g_.m.waitlock)
_g_.m.waitunlockf = nil
_g_.m.waitlock = nil
if !ok {
if trace.enabled {
traceGoUnpark(gp, 2)
}
casgstatus(gp, _Gwaiting, _Grunnable)
execute(gp, true) // Schedule it back, never returns.
}
}
schedule()
}

park_m() 在g0上停止

23行 又调用了schedule(),线程开始执行的地方。

  • 系统调用,可以看到当系统调用以后,最终也调用了schedule(),返回了线程的初始的地方

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    func exitsyscall() {
    ..
    osched()
    }
    func Gosched() {
    heckTimeouts()
    call(gosched_m)
    }
    func gosched_m(gp *g) {
    f trace.enabled {
    raceGoSched()
    }
    oschedImpl(gp)
    }
    func goschedImpl(gp *g) {
    ..
    chedule()
    }

上面的gopark和系统调用可以回到线程开始的位置,但是如果没人调用这些,怎么办?

可以用go build -gcflags -S main.go查看

会有很多runtime.morestack(),因为在调用其他方法前,会有morestack。下面就看看它长啥样。

asm_amd64.s

1
2
3
4
TEXT runtime·morestack(SB),NOSPLIT,$0-0
...
CALL runtime·newstack(SB)
CALL runtime·abort(SB) // crash if newstack returns

stack.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
func newstack() {
...
stackguard0 := atomic.Loaduintptr(&gp.stackguard0)
preempt := stackguard0 == stackPreempt
if preempt {
if gp == thisg.m.g0 {
throw("runtime: preempt g0")
}
if thisg.m.p == 0 && thisg.m.locks == 0 {
throw("runtime: g is running but p is not")
}

if gp.preemptShrink {
gp.preemptShrink = false
shrinkstack(gp)
}

if gp.preemptStop {
preemptPark(gp)
}

gopreempt_m(gp)
}
}
func gopreempt_m(gp *g) {
if trace.enabled {
traceGoPreempt()
}
goschedImpl(gp)
}
func goschedImpl(gp *g) {
status := readgstatus(gp)
if status&^_Gscan != _Grunning {
dumpgstatus(gp)
throw("bad g status")
}
casgstatus(gp, _Grunning, _Grunnable)
dropg()
lock(&sched.lock)
globrunqput(gp)
unlock(&sched.lock)
schedule()
}

4行 是否被抢占

25行 最终调用 schedule() 回到线程的初始位置

系统监控,当一个协程运行超过10毫秒,判定是大协程,可能造成其他协程饥饿,并将g结构体设置为抢占模式,就是stackguard0这个字段,然后morestack内部可以判断是否被抢占了,如果被抢占,就可以回sechedule()

基于信号量的抢占

如果上面的方法都不执行,那我们遇到一个大任务是不是就无能为力了呢?例如下面的代码

1
2
3
for{

}

我们的操作系统,会有很多底层通信的信号(线程的)。开始的时候可以把要处理的函数注册到SIGURG信号上,当GC的时候,给线程发信号,就可以了,等线程收到信号,处理函数执行就可以了。(非常巧妙)

总结

008.总结

如何学习Go语言微服务,快速步入架构师

从0到Go语言微服务架构师-海报 从0到Go语言微服务架构师
添加微信 公众号更多内容
wechat gzh