Channel,这次换个学法

1
2
3
4
5
c := make(chan string)
c<-"面向加薪学习"
<-c

c := make(chan string,10)

1行 新建无缓冲通道

2行 向通道里输入数据

3行 从通道里拿出数据

5行 新建有缓冲通道

有缓冲通道类似快递柜,只要有空间,别人就可以投递,而无缓冲通道则只能双方都在场的情况下,才能让数据成功收发。

下面看一下通道的源代码runtime/chan.go

1
2
3
4
5
6
7
8
9
10
11
12
13
type hchan struct {
qcount uint // total data in the queue
dataqsiz uint // size of the circular queue
buf unsafe.Pointer // points to an array of dataqsiz elements
elemsize uint16
closed uint32
elemtype *_type // element type
sendx uint // send index
recvx uint // receive index
recvq waitq // list of recv waiters
sendq waitq // list of send waiters
lock mutex
}

1行 channel的结构体

2行 环形队列的总长度

3行 环形队列的数据大小

4行 指向datasiz的数组元素的指针

5行 元素大小

6行 是否关闭

7行 元素类型

8行 发送队列索引

9行 接收队列索引

10行 接收队列

11行 发送队列

12行 锁,保护所有hchan结构体的字段,也就是说任何协程想操作通道中的字段,都要拿到这把锁。

go-35-001

为什么是环形队列

  1. 先进先出,队列本身的特性就保证的。

  2. 空间重复利用,固定的闭环,初始化即分配好内存空间,入队或出队只要返回内存空间的地址即可,重复利用,避免频繁的内存分配和释放的开销操作。类似,铁打的营盘,流水的兵, 数据都可以轮流放上去,但是地址,已经初始化的时候就固定了。

队列发送数据(runtime/chan.go)

1
2
3
func chansend1(c *hchan, elem unsafe.Pointer) {
chansend(c, elem, true, getcallerpc())
}

上面这个函数,就是向channel发送数据,编译后调用的,比如我们写的 c <- “面向加薪学习”

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
lock(&c.lock)

if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
if sg := c.recvq.dequeue(); sg != nil {
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
if c.qcount < c.dataqsiz {
qp := chanbuf(c, c.sendx)
if raceenabled {
racenotify(c, c.sendx, nil)
}
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}

if !block {
unlock(&c.lock)
return false
}

gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)
atomic.Store8(&gp.parkingOnChan, 1)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
}
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
...
if sg.elem != nil {
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
...
goready(gp, skip+1)
}

func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
dst := sg.elem
typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
memmove(dst, src, t.size)
}

收货人在家等待,可以直接上门送货

​ 9行, 从接收队列里出队一个

​ 10行 send()发送数据

​ 63行 sendDirect()直接发送

​ 66行 memmove,也就是内存移动,直接把copy过去

​ 60行 goready,唤醒该协程

收货人不在家,直接放快递柜

​ 14行 拿到缓存区一个地址

​ 18行 把数据拷贝给缓存地址即可

​ 19行 维护缓存的索引

收货人不在家,快递柜已满

34行 获取当前的协程

35-46行 包装成sudog

47行 sendq的入队

49行 休眠

队列接收数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
if c == nil {
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
...
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}

lock(&c.lock)

if c.closed != 0 {
if c.qcount == 0 {
if raceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
} else {
if sg := c.sendq.dequeue(); sg != nil {
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
}

if c.qcount > 0 {
qp := chanbuf(c, c.recvx)
if raceenabled {
racenotify(c, c.recvx, nil)
}
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}

if !block {
unlock(&c.lock)
return false, false
}

gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
atomic.Store8(&gp.parkingOnChan, 1)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
...
}

func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if c.dataqsiz == 0 {
if raceenabled {
racesync(c, sg)
}
if ep != nil {
recvDirect(c.elemtype, sg, ep)
}
} else {
qp := chanbuf(c, c.recvx)
if raceenabled {
racenotify(c, c.recvx, nil)
racenotify(c, c.recvx, sg)
}
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx
}
sg.elem = nil
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, skip+1)
}

func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) {
src := sg.elem
typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
memmove(dst, src, t.size)
}

从快递小哥手里直接拿快递

30行 获取一个发送队列里的Goroutine

31行 recv()

79行 缓存区为空

84行 直接接收 recvDirect()

116行 内存移动memmove()

110行 唤醒Goroutine

从快递柜里拿快递

87行 从缓冲区拿出来数据

93行 把缓冲区的数据拷贝到要接收的Goroutine里

95行 把接下来要发送的数据再发送到缓冲区里

直接从快递柜拿走快递,无其他人等待

36行 队列里有数据

37行 拿到缓存区的数据

42行 把缓存区的数据给接收的Goroutine

只有收货人,没有快递小哥

59行 拿到当前的Goroutine

60-71行 包装成sudog

72行 recvq入队

74行 休眠

如何学习Go语言微服务,快速步入架构师

从0到Go语言微服务架构师-海报 从0到Go语言微服务架构师
添加微信 公众号更多内容
wechat gzh