不使用锁的问题

运行下面的代码,你会发现结果x,每次可能都不一样,有时候是正确结果,有时候是错误的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package main

import (
"fmt"
"sync"
)

func main() {
x := 0
num := 1000
var wg sync.WaitGroup
wg.Add(num)
for i := 0; i < num; i++ {
go func() {
x = x + 1
wg.Done()
}()
}
wg.Wait()
fmt.Println(x)
}

这里导致问题的原因就是多个协程共同对x赋值操作。

go-34-001

下面我们对他进行改造

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func main() {
var x int32
num := 1000
var wg sync.WaitGroup
wg.Add(num)
for i := 0; i < num; i++ {
go func() {
atomic.AddInt32(&x, 1)
wg.Done()
}()
}
wg.Wait()
fmt.Println(x)
}

这次改造以后,每次运行的结果都是1000,正确的结果。

atomic_amd64.s

1
2
3
4
5
6
7
8
9
10
11
12
TEXT ·Xaddint32(SB), NOSPLIT, $0-20
JMP ·Xadd(SB)

TEXT ·Xadd(SB), NOSPLIT, $0-20
MOVQ ptr+0(FP), BX
MOVL delta+8(FP), AX
MOVL AX, CX
LOCK
XADDL AX, 0(BX)
ADDL CX, AX
MOVL AX, ret+16(FP)
RET

8行 Lock,这个是cpu级别的锁,是硬件锁。

还可以使用另外一个方式的锁-sync.Mutex

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func main() {
var x int32
num := 1000
var m sync.Mutex
var wg sync.WaitGroup
wg.Add(num)
for i := 0; i < num; i++ {
go func() {
m.Lock()
x = x + 1
m.Unlock()
wg.Done()
}()
}
wg.Wait()
fmt.Println(x)
}

下面看一下Mutex,sync/mutex.go

1
2
3
4
type Mutex struct {
state int32
sema uint32
}

sema信号量锁是一个unit32值,表示可并发的数量,一个sema对应一个semaRoot结构体,是给协程队列排队使用的,它是一个平衡二叉树。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
type semaRoot struct {
lock mutex
treap *sudog
nwait uint32
}

type sudog struct {
g *g
next *sudog
prev *sudog
elem unsafe.Pointer
acquiretime int64
releasetime int64
ticket uint32
isSelect bool
success bool

parent *sudog
waitlink *sudog
waittail *sudog
c *hchan
}

3行 sudog的指针,是平衡二叉树的根节点

8行 包含一个协程

9行 后面的sudog指针

10行 前面的sudog指针

关系如下图

go-34-002

下面看一下如何获取sema

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
// Called from runtime.
func semacquire(addr *uint32) {
semacquire1(addr, false, 0, 0)
}

func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags, skipframes int) {
gp := getg()
if gp != gp.m.curg {
throw("semacquire not on the G stack")
}

if cansemacquire(addr) {
return
}

s := acquireSudog()
root := semtable.rootFor(addr)
t0 := int64(0)
s.releasetime = 0
s.acquiretime = 0
s.ticket = 0
if profile&semaBlockProfile != 0 && blockprofilerate > 0 {
t0 = cputicks()
s.releasetime = -1
}
if profile&semaMutexProfile != 0 && mutexprofilerate > 0 {
if t0 == 0 {
t0 = cputicks()
}
s.acquiretime = t0
}
for {
lockWithRank(&root.lock, lockRankRoot)
atomic.Xadd(&root.nwait, 1)
if cansemacquire(addr) {
atomic.Xadd(&root.nwait, -1)
unlock(&root.lock)
break
}
root.queue(addr, s, lifo)
goparkunlock(&root.lock, waitReasonSemacquire, traceEvGoBlockSync, 4+skipframes)
if s.ticket != 0 || cansemacquire(addr) {
break
}
}
if s.releasetime > 0 {
blockevent(s.releasetime-t0, 3+skipframes)
}
releaseSudog(s)
}

func cansemacquire(addr *uint32) bool {
for {
v := atomic.Load(addr)
if v == 0 {
return false
}
if atomic.Cas(addr, v, v-1) {
return true
}
}
}

func semrelease(addr *uint32) {
semrelease1(addr, false, 0)
}

func semrelease1(addr *uint32, handoff bool, skipframes int) {
root := semtable.rootFor(addr)
atomic.Xadd(addr, 1)
if atomic.Load(&root.nwait) == 0 {
return
}
lockWithRank(&root.lock, lockRankRoot)
if atomic.Load(&root.nwait) == 0 {
unlock(&root.lock)
return
}
s, t0 := root.dequeue(addr)
if s != nil {
atomic.Xadd(&root.nwait, -1)
}
unlock(&root.lock)
if s != nil {
acquiretime := s.acquiretime
if acquiretime != 0 {
mutexevent(t0-acquiretime, 3+skipframes)
}
if s.ticket != 0 {
throw("corrupted semaphore ticket")
}
if handoff && cansemacquire(addr) {
s.ticket = 1
}
readyWithTime(s, 5+skipframes)
if s.ticket == 1 && getg().m.locks == 0 {
goyield()
}
}
}

12行 试图获取sema信号量锁

52行 cansemacquire()使用了atomic包下的Load()和Cas(),也就是说使用CPU级别的锁,避免多协程并发数据不一致的问题。

70行 加1,就是释放sema信号量锁。

总结: sema是一个uint类型,

sema>0的情况, 获取信号量锁(sema-1),释放信号量锁(sema+1)

55-57行 sema==0 返回false。

40行 在semaRoot中,把s添加到block goroutines的队列中去

41行 进入休眠,不在调度,也就是会停到这个地方

79行 出队,拿出一个协程

Mutex->State

go-34-003

Locked:是否被上锁(0-未上锁,1-已上锁)

Woken:是否被唤醒

Starving:是否饥饿

WaiterShift:记录等待这个锁的协程的数量

非饥饿模式下的加锁和解锁

加锁流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
const (
mutexLocked = 1 << iota // mutex is locked
mutexWoken
mutexStarving
mutexWaiterShift = iota
}

func (m *Mutex) Lock() {
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
m.lockSlow()
}

func (m *Mutex) lockSlow() {
var waitStartTime int64
starving := false
awoke := false
iter := 0
old := m.state
for {
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
runtime_doSpin()
iter++

old = m.state
continue
}
new := old
if old&mutexStarving == 0 {
new |= mutexLocked
}
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift
}
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
if awoke {
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
new &^= mutexWoken
}
if atomic.CompareAndSwapInt32(&m.state, old, new) {
if old&(mutexLocked|mutexStarving) == 0 {
break
}
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
if old&mutexStarving != 0 {
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
delta := int32(mutexLocked - 1<<mutexWaiterShift)
if !starving || old>>mutexWaiterShift == 1 {
delta -= mutexStarving
}
atomic.AddInt32(&m.state, delta)
break
}
awoke = true
iter = 0
} else {
old = m.state
}
}

if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}

9行 如果state是0,就改成1,也就是拿到锁,然后就return

15行 第一时间没拿到锁,进入lockSlow()

25行 是否被锁,是否饥饿,是否可以被自旋

26-29行 判断是否刚刚被唤醒

30行 继续自旋

31行 记录自旋次数

33行 回到上面for的地方,继续执行语句

37-39行 如果不是饥饿状态,那么就设置成加锁状态

40-42行 如果不再自旋,并且仍然是锁定状态,我们就把等待数量+1,然后进入52行,把新状态写入进去。

52行 进行加锁成功

53-55行 确认是否加锁成功,如果成功,就break出上面的for循环。

60行 如果sema==0,就会让协程休眠等待,进入队列,协程就会停在这一行,进入休眠等待

总结:

  1. 在多协程的情况下,每个协程尝试cas直接加锁,总有一个成功,那么其他的协程获取sema,但是sema这个时候是0,所以只能放到semaRoot的treap里,进行休眠等待,同时,wiaterShift,也会增加对应进入休眠等待协程的数量,当锁被释放后,会叫醒休眠等待的协程。

  2. 无法直接获取锁,多次自旋,多次失败,就进入sema队列等待。

解锁流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
func (m *Mutex) Unlock() {
if race.Enabled {
_ = m.state
race.Release(unsafe.Pointer(m))
}
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
m.unlockSlow(new)
}
}

func (m *Mutex) unlockSlow(new int32) {
if (new+mutexLocked)&mutexLocked == 0 {
fatal("sync: unlock of unlocked mutex")
}
if new&mutexStarving == 0 {
old := new
for {
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema, false, 1)
return
}
old = m.state
}
} else {
runtime_Semrelease(&m.sema, true, 1)
}
}

6-9行 减去1,如果new是0,证明没有等待的Goroutine,否则就会进入unlockSlow()

19行 如果是锁的状态或者唤醒状态或者饥饿状态或者更多的Goroutine的等待,就return

24行 从睡眠等待的Goroutine队列里,释放出来一个Goroutine,这个被释放的Goroutine会去Lock()的第61行,再去执行,然后再次进入加锁中的for循环,再次判断是否自旋,再次和其他Goroutine去竞争锁。

饥饿模式下加锁/解锁

当前Goroutine执行超过1毫秒,就会变成饥饿状态,饥饿模式下,新来的或者被唤醒的Goroutine,都不自旋,获取不到锁,就直接休眠等待。饥饿模式下,被唤醒的Goroutine直接获取锁。只有当没有Goroutine在队列中继续等待,就变成了正常模式。

其实,这个和我们买东西差不多,也就是先到先得,进行排队,如果再有新来的,就去队尾接着排,最终再按排队顺序执行,就可以了。

饥饿模式好处:

  • 降低系统负担,只要是饥饿模式,新来的Goroutine直接进入队列,减少了自旋给系统带来的压力。
  • 公平原则,只要出队列,直接拿到锁,不用和新来的Goroutine再次进行竞争锁。因为之前他已经参与了竞争和自旋,花费了时间,这次就不用竞争和等待,直接可以拿到锁。

加锁流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
func (m *Mutex) lockSlow() {
var waitStartTime int64
starving := false
awoke := false
iter := 0
old := m.state
for {
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
runtime_doSpin()
iter++

old = m.state
continue
}
new := old
if old&mutexStarving == 0 {
new |= mutexLocked
}
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift
}
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
if awoke {
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
new &^= mutexWoken
}
if atomic.CompareAndSwapInt32(&m.state, old, new) {
if old&(mutexLocked|mutexStarving) == 0 {
break
}
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
if old&mutexStarving != 0 {
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
delta := int32(mutexLocked - 1<<mutexWaiterShift)
if !starving || old>>mutexWaiterShift == 1 {
delta -= mutexStarving
}
atomic.AddInt32(&m.state, delta)
break
}
awoke = true
iter = 0
} else {
old = m.state
}
}

if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}

44行 判断是否是饥饿模式,如果starving是true,证明未来要设置饥饿模式,然后走到for循环后,再次循环到26行。

26-27行 设置变量new为饥饿模式

35行 写入饥饿模式,此时,锁就是饥饿状态了,再从新执行for循环的时候,新来的goroutine不再自旋,等待数量+1,就进入队列,然后执行runtime_SemacquireMutex()。

46-56行 饥饿模式下,唤醒的goroutine直接拿到锁。

如何学习Go语言微服务,快速步入架构师

从0到Go语言微服务架构师-海报 从0到Go语言微服务架构师
添加微信 公众号更多内容
wechat gzh