go mutex源码分析


使用Mutex

互斥锁Mutex提供了两个函数Lock和Unlock。

func(m *Mutex) Lock()
func(m *Mutex) Unlock()

源码分析

Mutex实现演变过程

go mutex源码分析

初版

// 互斥锁的结构,包含两个字段 
type Mutex struct { 
    key  int32 // 锁是否被持有的标识 
    sema int32 // 信号量专用,用以阻塞/唤醒goroutine 
}

go mutex源码分析

func (m *Mutex) Lock() { 
    if xadd(&m.key, 1) == 1 { //标识加1,如果等于1,成功获取到锁 
        return 
    } 
    semacquire(&m.sema) // 否则阻塞等待 
}

CAS+1后,如果key值是1,那么加锁成功;如果key值不是1,那么锁已经被持有,阻塞自己,直到锁释放。

func (m *Mutex) Unlock() { 
    if xadd(&m.key, -1) == 0 { // 将标识减去1,如果是0,那么说明没有等待者
        return 
    } 
    semrelease(&m.sema) // 唤醒一个阻塞的goroutine
}

因为Mutex没有保存持有锁的goroutine信息,所以Mutex不是可重入的锁,Unlock函数可以被没持有这个锁的goroutine调用来释放锁。

缺点:先来先得,新人没有机会先拿到锁。

给新人机会

新来的和被唤醒的goroutine一起争抢锁,打破先来先得的逻辑。

type Mutex struct { 
    state int32 
    sema  uint32 
} 

const ( 
    mutexLocked = 1 << iota // 1 
    mutexWoken // 2
    mutexWaiterShift = iota // 2
)

go mutex源码分析

state是一个复合型字段,倒数第1位表示这个锁是否被持有,倒数第2位表示是否有唤醒的goroutine,剩余的位数表示等待锁的goroutine数量。

func (m *Mutex) Lock() { 
    // 没有阻塞等待和唤醒的goroutine,直接获取到锁
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) { 
        return
    } 
 
    awoke := false 
    for { 
        old := m.state 
		
		// 准备新状态
        new := old | mutexLocked // 标记持有锁 
        if old&mutexLocked != 0 { // 原来已加锁
        	new = old + 1<<mutexWaiterShift //等待者数量加1,后面不可能获取到锁
        } 
        if awoke { 
            // goroutine是被唤醒的
            // 新状态清除唤醒标志,倒数第2位设置成0,每次只有一个被唤醒
            new &^= mutexWoken 
        } 

		// cas成功后继续判断原状态是否已加锁
        if atomic.CompareAndSwapInt32(&m.state, old, new) { // 设置新状态 
            if old&mutexLocked == 0 { // 原状态未加锁 
            	break 
            } 
            runtime.Semacquire(&m.sema) // 阻塞
            awoke = true 
        } 
    } 
}

相比于新来的goroutine,被唤醒的goroutine需要清除mutexWoken标志。

请求锁的goroutine

当前锁被持有

当前锁未被持有

新来的goroutine

waiter++;休眠

获取到锁

被唤醒的goroutine

清除mutexWoken标志; waiter++;休眠

清除mutexWoken标志;获取到锁

func (m *Mutex) Unlock() {
	// 直接释放锁
    new := atomic.AddInt32(&m.state, -mutexLocked) // -1来去掉锁标志 
    if (new+mutexLocked)&mutexLocked == 0 {        // 原来未加锁直接panic 
    	panic("sync: unlock of unlocked mutex") 
    } 
 
    old := new 
    for { 
    	if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken) != 0 { // 没有等待者,或者有唤醒的waiter,或者锁又被加上 
    		return 
    	} 
    	new = (old - 1<<mutexWaiterShift) | mutexWoken // 等待者数量-1,设置唤醒标志
    	if atomic.CompareAndSwapInt32(&m.state, old, new) { 
    		runtime.Semrelease(&m.sema) // 唤醒一个等待者 
    		return 
    	} 
    	old = m.state 
    } 
}

多给些机会

如果goroutine获取不到锁,那么通过多次循环的自旋方式来尝试获取锁,在一定程度上避免阻塞和唤醒。

 

判断是否可以进入自旋

runtime_canSpin的实现在sync_runtime_canSpin

1. 互斥锁处于正常模式

2. 机器是多核CPU,且GOMAXPROCS>1

3. 至少存在一个正在运行的其他处理器P,它的本地运行队列为空

4. 累计自旋次数小于4

 

自旋(空转CPU)

runtime_doSpin的实现在sync_runtime_doSpin

执行30次 PAUSE 指令,消耗CPU 时间。

 

缺点:饥饿问题依然存在。

解决饥饿

饥饿模式是在Go 1.9版本引入的,解决如下2个问题:

1. 在极端情况下,锁一直由新来的goroutine获得,等待中的goroutine一直获取不到锁。

2. goroutine阻塞时会放在等待队列的尾部,尾部的goroutine很难获得锁。

go mutex源码分析

state字段增加了饥饿标记,即增加饥饿模式。

 

go 1.15.5

Mutex有正常模式和饥饿模式。

正常模式下,刚被唤醒的waiter与新来的goroutine(优势在于已在CPU上运行,可能不止一个新来的)竞争锁时,waiter极有可能失败,需要在等待队列中排队。

饥饿模式下,锁的所有权直接从解锁的goroutine转移到等待队列中的队头waiter即直接获取到锁。新来的goroutine不会尝试去获取锁,也不会自旋,进入等待队列的队尾。

与饥饿模式相比,正常模式下的互斥锁性能更好。因为相比于将锁的所有权赋予给唤醒的waiter,直接竞争锁能降低整体goroutine获取锁的延时。

模式切换

如果已被其他goroutine加锁且当前goroutine等待锁的时间超过了1ms,则进入饥饿模式。

go mutex源码分析

如果当前goroutine是最后一个waiter或者等待的时间小于1ms,则进入正常模式。

go mutex源码分析

空闲状态指state=0。

加锁

  1. 如果锁是空闲状态,则通过CAS直接加锁。
  2. 如果锁处于正常模式且已加锁,则尝试自旋和新来的冒充被唤醒的,最多自旋4次。
  3. 计算锁的期望状态,尝试更新锁状态。
  4. 在更新锁状态成功后,如果原来已加锁且没有处于饥饿,那么直接退出。
  5. 当前goroutine不能获取到锁时睡眠,等待解锁的goroutine唤醒。
  6. 被唤醒的goroutine发现锁处于饥饿模式则直接拿到锁,否则重新进入步骤2中。

解锁

  1. 解除加锁标志位,如果锁变为空闲状态,则直接解锁。
  2. 如果解锁一个没有上锁的锁,则直接抛出异常。
  3. 正常模式下,如果没有goroutine等待锁释放,或者锁被其他goroutine设置成了锁定状态、唤醒状态、饥饿模式中的任一种,则直接退出;否则,唤醒队列队头waiter。

饥饿模式下,直接将锁的所有权交给等待队列队头waiter,被唤醒的waiter会负责设置Locked标志位。

 

参考官方测试用例(没有考虑等待队列等待数量超过1的情况)

sync/mutex_test.go

func TestMutexFairness(t *testing.T) 

package main

import (
   "fmt"
   "sync"
   "time"
)

func main() {
   var mu sync.Mutex
   stop := make(chan bool)
   defer close(stop)
   go func() {
      // 模拟新来的goroutine
      for {
         mu.Lock()
         fmt.Println("get lock before sleep")
         time.Sleep(100 * time.Microsecond)
         mu.Unlock()
         select {
         case <-stop:
            return
         default:
         }
      }
   }()
   done := make(chan bool)
   go func() {
      // 模拟等待队列中的goroutine
      for i := 0; i < 10; i++ {
         time.Sleep(100 * time.Microsecond)
         mu.Lock()
         fmt.Println("get lock after sleep")
         mu.Unlock()
      }
      done <- true
   }()
   select {
   case <-done:
   case <-time.After(10 * time.Second):
      fmt.Println("can't acquire Mutex in 10 seconds")
   }
}

结果是

get lock before sleep
get lock before sleep
get lock before sleep
get lock before sleep
get lock after sleep
get lock before sleep
get lock before sleep
get lock before sleep
get lock before sleep
get lock after sleep
get lock before sleep
get lock before sleep
get lock before sleep
get lock before sleep
get lock after sleep
get lock before sleep
get lock before sleep
get lock before sleep
get lock before sleep
get lock after sleep
get lock before sleep
get lock before sleep
get lock before sleep
get lock after sleep
get lock before sleep
get lock before sleep
get lock before sleep
get lock before sleep
get lock after sleep
get lock before sleep
get lock before sleep
get lock before sleep
get lock before sleep
get lock after sleep
get lock before sleep
get lock before sleep
get lock before sleep
get lock before sleep
get lock after sleep
get lock before sleep
get lock before sleep
get lock before sleep
get lock before sleep
get lock after sleep
get lock before sleep
get lock before sleep
get lock before sleep
get lock after sleep
get lock before sleep

参考资料

https://blog.csdn.net/EDDYCJY/article/details/120137010

https://time.geekbang.org/column/article/295850

原创文章,作者:jamestackk,如若转载,请注明出处:https://blog.ytso.com/276374.html

(0)
上一篇 2022年7月23日
下一篇 2022年7月23日

相关推荐

发表回复

登录后才能评论