不使用锁的问题 运行下面的代码,你会发现结果x,每次可能都不一样,有时候是正确结果,有时候是错误的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 package main import ( "fmt" "sync" ) func main() { x := 0 num := 1000 var wg sync.WaitGroup wg.Add(num) for i := 0; i < num; i++ { go func() { x = x + 1 wg.Done() }() } wg.Wait() fmt.Println(x) }
这里导致问题的原因就是多个协程共同对x赋值操作。
下面我们对他进行改造
1 2 3 4 5 6 7 8 9 10 11 12 13 14 func main() { var x int32 num := 1000 var wg sync.WaitGroup wg.Add(num) for i := 0; i < num; i++ { go func() { atomic.AddInt32(&x, 1) wg.Done() }() } wg.Wait() fmt.Println(x) }
这次改造以后,每次运行的结果都是1000,正确的结果。
atomic_amd64.s
1 2 3 4 5 6 7 8 9 10 11 12 TEXT ·Xaddint32(SB), NOSPLIT, $0-20 JMP ·Xadd(SB) TEXT ·Xadd(SB), NOSPLIT, $0-20 MOVQ ptr+0(FP), BX MOVL delta+8(FP), AX MOVL AX, CX LOCK XADDL AX, 0(BX) ADDL CX, AX MOVL AX, ret+16(FP) RET
8行 Lock,这个是cpu级别的锁,是硬件锁。
还可以使用另外一个方式的锁-sync.Mutex
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 func main() { var x int32 num := 1000 var m sync.Mutex var wg sync.WaitGroup wg.Add(num) for i := 0; i < num; i++ { go func() { m.Lock() x = x + 1 m.Unlock() wg.Done() }() } wg.Wait() fmt.Println(x) }
下面看一下Mutex,sync/mutex.go
1 2 3 4 type Mutex struct { state int32 sema uint32 }
sema信号量锁是一个unit32值,表示可并发的数量,一个sema对应一个semaRoot结构体,是给协程队列排队使用的,它是一个平衡二叉树。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 type semaRoot struct { lock mutex treap *sudog nwait uint32 } type sudog struct { g *g next *sudog prev *sudog elem unsafe.Pointer acquiretime int64 releasetime int64 ticket uint32 isSelect bool success bool parent *sudog waitlink *sudog waittail *sudog c *hchan }
3行 sudog的指针,是平衡二叉树的根节点
8行 包含一个协程
9行 后面的sudog指针
10行 前面的sudog指针
关系如下图
下面看一下如何获取sema
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 // Called from runtime. func semacquire(addr *uint32) { semacquire1(addr, false, 0, 0) } func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags, skipframes int) { gp := getg() if gp != gp.m.curg { throw("semacquire not on the G stack") } if cansemacquire(addr) { return } s := acquireSudog() root := semtable.rootFor(addr) t0 := int64(0) s.releasetime = 0 s.acquiretime = 0 s.ticket = 0 if profile&semaBlockProfile != 0 && blockprofilerate > 0 { t0 = cputicks() s.releasetime = -1 } if profile&semaMutexProfile != 0 && mutexprofilerate > 0 { if t0 == 0 { t0 = cputicks() } s.acquiretime = t0 } for { lockWithRank(&root.lock, lockRankRoot) atomic.Xadd(&root.nwait, 1) if cansemacquire(addr) { atomic.Xadd(&root.nwait, -1) unlock(&root.lock) break } root.queue(addr, s, lifo) goparkunlock(&root.lock, waitReasonSemacquire, traceEvGoBlockSync, 4+skipframes) if s.ticket != 0 || cansemacquire(addr) { break } } if s.releasetime > 0 { blockevent(s.releasetime-t0, 3+skipframes) } releaseSudog(s) } func cansemacquire(addr *uint32) bool { for { v := atomic.Load(addr) if v == 0 { return false } if atomic.Cas(addr, v, v-1) { return true } } } func semrelease(addr *uint32) { semrelease1(addr, false, 0) } func semrelease1(addr *uint32, handoff bool, skipframes int) { root := semtable.rootFor(addr) atomic.Xadd(addr, 1) if atomic.Load(&root.nwait) == 0 { return } lockWithRank(&root.lock, lockRankRoot) if atomic.Load(&root.nwait) == 0 { unlock(&root.lock) return } s, t0 := root.dequeue(addr) if s != nil { atomic.Xadd(&root.nwait, -1) } unlock(&root.lock) if s != nil { acquiretime := s.acquiretime if acquiretime != 0 { mutexevent(t0-acquiretime, 3+skipframes) } if s.ticket != 0 { throw("corrupted semaphore ticket") } if handoff && cansemacquire(addr) { s.ticket = 1 } readyWithTime(s, 5+skipframes) if s.ticket == 1 && getg().m.locks == 0 { goyield() } } }
12行 试图获取sema信号量锁
52行 cansemacquire()使用了atomic包下的Load()和Cas(),也就是说使用CPU级别的锁,避免多协程并发数据不一致的问题。
70行 加1,就是释放sema信号量锁。
总结: sema是一个uint类型,
sema>0的情况, 获取信号量锁(sema-1),释放信号量锁(sema+1)
55-57行 sema==0 返回false。
40行 在semaRoot中,把s添加到block goroutines的队列中去
41行 进入休眠,不在调度,也就是会停到这个地方
79行 出队,拿出一个协程
Mutex->State
Locked:是否被上锁(0-未上锁,1-已上锁)
Woken:是否被唤醒
Starving:是否饥饿
WaiterShift:记录等待这个锁的协程的数量
非饥饿模式下的加锁和解锁 加锁流程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 const ( mutexLocked = 1 << iota // mutex is locked mutexWoken mutexStarving mutexWaiterShift = iota } func (m *Mutex) Lock() { if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) { if race.Enabled { race.Acquire(unsafe.Pointer(m)) } return } m.lockSlow() } func (m *Mutex) lockSlow() { var waitStartTime int64 starving := false awoke := false iter := 0 old := m.state for { if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) { if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { awoke = true } runtime_doSpin() iter++ old = m.state continue } new := old if old&mutexStarving == 0 { new |= mutexLocked } if old&(mutexLocked|mutexStarving) != 0 { new += 1 << mutexWaiterShift } if starving && old&mutexLocked != 0 { new |= mutexStarving } if awoke { if new&mutexWoken == 0 { throw("sync: inconsistent mutex state") } new &^= mutexWoken } if atomic.CompareAndSwapInt32(&m.state, old, new) { if old&(mutexLocked|mutexStarving) == 0 { break } queueLifo := waitStartTime != 0 if waitStartTime == 0 { waitStartTime = runtime_nanotime() } runtime_SemacquireMutex(&m.sema, queueLifo, 1) starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs old = m.state if old&mutexStarving != 0 { if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 { throw("sync: inconsistent mutex state") } delta := int32(mutexLocked - 1<<mutexWaiterShift) if !starving || old>>mutexWaiterShift == 1 { delta -= mutexStarving } atomic.AddInt32(&m.state, delta) break } awoke = true iter = 0 } else { old = m.state } } if race.Enabled { race.Acquire(unsafe.Pointer(m)) }
9行 如果state是0,就改成1,也就是拿到锁,然后就return
15行 第一时间没拿到锁,进入lockSlow()
25行 是否被锁,是否饥饿,是否可以被自旋
26-29行 判断是否刚刚被唤醒
30行 继续自旋
31行 记录自旋次数
33行 回到上面for的地方,继续执行语句
37-39行 如果不是饥饿状态,那么就设置成加锁状态
40-42行 如果不再自旋,并且仍然是锁定状态,我们就把等待数量+1,然后进入52行,把新状态写入进去。
52行 进行加锁成功
53-55行 确认是否加锁成功,如果成功,就break出上面的for循环。
60行 如果sema==0,就会让协程休眠等待,进入队列,协程就会停在这一行,进入休眠等待
总结:
在多协程的情况下,每个协程尝试cas直接加锁,总有一个成功,那么其他的协程获取sema,但是sema这个时候是0,所以只能放到semaRoot的treap里,进行休眠等待,同时,wiaterShift,也会增加对应进入休眠等待协程的数量,当锁被释放后,会叫醒休眠等待的协程。
无法直接获取锁,多次自旋,多次失败,就进入sema队列等待。
解锁流程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 func (m *Mutex) Unlock() { if race.Enabled { _ = m.state race.Release(unsafe.Pointer(m)) } new := atomic.AddInt32(&m.state, -mutexLocked) if new != 0 { m.unlockSlow(new) } } func (m *Mutex) unlockSlow(new int32) { if (new+mutexLocked)&mutexLocked == 0 { fatal("sync: unlock of unlocked mutex") } if new&mutexStarving == 0 { old := new for { if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 { return } new = (old - 1<<mutexWaiterShift) | mutexWoken if atomic.CompareAndSwapInt32(&m.state, old, new) { runtime_Semrelease(&m.sema, false, 1) return } old = m.state } } else { runtime_Semrelease(&m.sema, true, 1) } }
6-9行 减去1,如果new是0,证明没有等待的Goroutine,否则就会进入unlockSlow()
19行 如果是锁的状态或者唤醒状态或者饥饿状态或者更多的Goroutine的等待,就return
24行 从睡眠等待的Goroutine队列里,释放出来一个Goroutine,这个被释放的Goroutine会去Lock()的第61行,再去执行,然后再次进入加锁中的for循环,再次判断是否自旋,再次和其他Goroutine去竞争锁。
饥饿模式下加锁/解锁 当前Goroutine执行超过1毫秒,就会变成饥饿状态,饥饿模式下,新来的或者被唤醒的Goroutine,都不自旋,获取不到锁,就直接休眠等待。饥饿模式下,被唤醒的Goroutine直接获取锁。只有当没有Goroutine在队列中继续等待,就变成了正常模式。
其实,这个和我们买东西差不多,也就是先到先得,进行排队,如果再有新来的,就去队尾接着排,最终再按排队顺序执行,就可以了。
饥饿模式好处:
降低系统负担,只要是饥饿模式,新来的Goroutine直接进入队列,减少了自旋给系统带来的压力。
公平原则,只要出队列,直接拿到锁,不用和新来的Goroutine再次进行竞争锁。因为之前他已经参与了竞争和自旋,花费了时间,这次就不用竞争和等待,直接可以拿到锁。
加锁流程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 func (m *Mutex) lockSlow() { var waitStartTime int64 starving := false awoke := false iter := 0 old := m.state for { if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) { if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { awoke = true } runtime_doSpin() iter++ old = m.state continue } new := old if old&mutexStarving == 0 { new |= mutexLocked } if old&(mutexLocked|mutexStarving) != 0 { new += 1 << mutexWaiterShift } if starving && old&mutexLocked != 0 { new |= mutexStarving } if awoke { if new&mutexWoken == 0 { throw("sync: inconsistent mutex state") } new &^= mutexWoken } if atomic.CompareAndSwapInt32(&m.state, old, new) { if old&(mutexLocked|mutexStarving) == 0 { break } queueLifo := waitStartTime != 0 if waitStartTime == 0 { waitStartTime = runtime_nanotime() } runtime_SemacquireMutex(&m.sema, queueLifo, 1) starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs old = m.state if old&mutexStarving != 0 { if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 { throw("sync: inconsistent mutex state") } delta := int32(mutexLocked - 1<<mutexWaiterShift) if !starving || old>>mutexWaiterShift == 1 { delta -= mutexStarving } atomic.AddInt32(&m.state, delta) break } awoke = true iter = 0 } else { old = m.state } } if race.Enabled { race.Acquire(unsafe.Pointer(m)) }
44行 判断是否是饥饿模式,如果starving是true,证明未来要设置饥饿模式,然后走到for循环后,再次循环到26行。
26-27行 设置变量new为饥饿模式
35行 写入饥饿模式,此时,锁就是饥饿状态了,再从新执行for循环的时候,新来的goroutine不再自旋,等待数量+1,就进入队列,然后执行runtime_SemacquireMutex()。
46-56行 饥饿模式下,唤醒的goroutine直接拿到锁。
如何学习Go语言微服务,快速步入架构师
添加微信
公众号更多内容