type RWMutex struct 🚀

  1. runtime/rwmutex.go中有这个文件的修改过的副本。如果你在这里做了任何更改,看看是否应该在那里也做更改。
  2. RWMutex 是一种 读/写 互斥锁。该锁可以由任意数量的或单个持有。
  3. RWMutex 的零值是一个未锁定的互斥锁。RWMutex 在第一次使用后不能复制。
  4. 如果一个goroutine持有一个用于读取的RWMutex,而另一个goroutine可能会调用Lock,
  5. 那么任何goroutine都不应该期望能够获得一个读锁,直到初始的读锁被释放。
  6. 特别是,这禁止了递归读锁定。这是为了确保锁最终可用。被阻塞的锁调用会排除新的读取器获取锁。
  7. RWMutex:读写互斥锁 1) 该锁可以被同时多个读取者持有或唯一写入者持有 2) RWMutex可以创建为其他结构体的字段 3) 零值为解锁状态
  8. RWMutex 类型的锁也和线程无关,可以由不同的线程加读取锁/写入和解读取锁/写入锁
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// There is a modified copy of this file in runtime/rwmutex.go.
// If you make any changes here, see if you should make them there.

// A RWMutex is a reader/writer mutual exclusion lock.
// The lock can be held by an arbitrary number of readers or a single writer.
// The zero value for a RWMutex is an unlocked mutex.
//
// A RWMutex must not be copied after first use.
//
// If a goroutine holds a RWMutex for reading and another goroutine might
// call Lock, no goroutine should expect to be able to acquire a read lock
// until the initial read lock is released. In particular, this prohibits
// recursive read locking. This is to ensure that the lock eventually becomes
// available; a blocked Lock call excludes new readers from acquiring the
// lock.
type RWMutex struct {
    // 1) 一把互斥锁,保护以下字段
    
    // 所有写goroutine争抢sync.Mutex锁的goroutine都这这里排队等待
    w           Mutex  // held if there are pending writers
    
    // 2) semaphore 读写等待池
    
    // 获取到 sync.Mutex 锁gorutine,并等待正在运行 读goroutine 时,该写goroutine在这里等待
    // 因此这里只可能是只有一个写goroutine在等待或者没有
    writerSem   uint32 // semaphore for writers to wait for completing readers
    // readerSem 记录着所有等待读的协程,当有写操作正在进行中,后面来的读操作全部排队等待在这里
    // 等待正在进行中的读操作完成后释放writerSem中的写操作完成后,这里排队的读协程将被释放
    readerSem   uint32 // semaphore for readers to wait for completing writers
    
    // 3) 读等待数量
    
    // readerCount 记录的所有的读goroutine数量(【正在执行的goroutine】+【等待在readerSem中的goroutine】),
    // 调用RLock方法该值就会加一
    // 当有写goroutine获取到sync.Mutex时,会将该值原子操作减去rwmutexMaxReaders变成负数,
    // 告知RLock方法有写操作在进行,goroutine去readerSem吧
    readerCount int32  // number of pending readers
    // 在获取到Mutex后,记录当前【正在进行的读goroutine数量】,不包括存在readerSem排队的,这些读goroutine正在工作线程上运行
    // 在当前写操作开始时等待正在运行全部读goroutine的数量,注意这里可以是负数
    readerWait  int32  // number of departing readers
}

const

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 2^30 = 1,073,741,824	完全能满足读取的数量
const rwmutexMaxReaders = 1 << 30	// 最大读取数量

// Happens-before relationships are indicated to the race detector via:
// - Unlock  -> Lock:  readerSem
// - Unlock  -> RLock: readerSem
// - RUnlock -> Lock:  writerSem
//
// The methods below temporarily disable handling of race synchronization
// events in order to provide the more precise model above to the race
// detector.
//
// For example, atomic.AddInt32 in RLock should not appear to provide
// acquire-release semantics, which would incorrectly synchronize racing
// readers, thus potentially missing races.
//
// Happens-before关系通过以下方式指示竞争检测器:
// 	Unlock  -> Lock:  readerSem
//  Unlock  -> RLock: readerSem
//  RUnlock -> Lock:  writerSem
//
// 下面的方法暂时禁用了竞争同步事件的处理,以便为竞争检测器提供更精确的模型。
// 例如,RLock中的 atomic.AddInt32 看起来不应该提供 获取-释放语义,这将不正确地同步竞争的阅读器,从而可能错过竞争。

字段和方法描述

Lock()、Unlock()
  • 写操作时调用的方法,如果锁已被reader或waiter持有,那么Lock方法会一直阻塞,直到能获取到锁
    1. 写操作时,如果锁被readers持有,那么将等待所有的reader解锁,返回写操作获得锁,这期间还有来的reader全部去排队等待,等待写操作解锁写操作解锁期间先把等待在排队的全部释放出来,然后再去解锁互斥锁。在互斥锁解锁小段时间来的读操作直接获取锁不需要去排队,互斥锁解锁后才允许排队的写操作或正在来的写操作去争抢互斥写锁
    2. 写操作时,如果锁已被waiter持有,那么当前写操作等待在RWRutex.w.seam信号量中,等待前面一个写锁完成,此时来的读操作全部阻塞起,已经在进行的读操作正常进行
  • Unlock方法是配对的释放锁的方法

RLock()、RUnlock()
  • 读操作时调用的方法,如果锁已经被writer持有的话,RLock方法会一直阻塞,直到能获取到锁,否则就直接返回
    1. 读操作时没有waiter持有锁情况,直接记录readerCount加一,返回就返回,表示获取到锁
    2. 读操作时存在waiter持有锁情况,则当前读操作排队在readerSem,等待当前写完成
  • 而RUnlock是reader释放锁的方法

RLocker()
  • 这个方法的作用是为读操作返回一个Locker接口的对象,它的Lock方法会调用RWMutex的RLock方法,它的Unlock方法会调用RWMutex的RUnlock方法

Lock()

  1. Lock 锁定 rw 用于写入。
  2. 如果锁已经锁定用于读或写,那么锁将阻塞,直到锁可用为止。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
// Lock locks rw for writing.
// If the lock is already locked for reading or writing,
// Lock blocks until the lock is available.
func (rw *RWMutex) Lock() {
    if race.Enabled {
        _ = rw.w.state
        race.Disable()
    }
    
    // 1) 尝试获取 sync.Mutex 锁
    
    // First, resolve competition with other writers.
    //
    // 首先,解决与其他 writers 的竞争。
    // 如果存在多个写gorutine的都在调用Lock竞争锁,这里需要先去竞争锁
    rw.w.Lock()	// sync.Mutex
    
    // 2) 原子修改readerCount值,告诉后面读goroutine调用Rlock函数需要去readerSem中挂起
    // 由于刚获取到锁,因此此时只存在正在运行读goroutine和等待在writerSem中写goroutine,不存在等待在readerSem中的goroutine
    // 因此 readerCount 存储的是正在运行读goroutine,在下面这行原子操作执行前都认为读goroutine是不需要挂起的
    
    // Announce to readers there is a pending writer.
    // 
    // 通过把rw.readerCount设置成一个负数,来告知其他读goroutine当前有写的goroutine正在等待进入临界区
    // atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders); 告诉后面来的读操作去排队等待,你们应该在本次写操作完后再去读取数据
    // 由于当前刚获取到 Mutex,所以这里的 r 应该表示当前正在运行的读goroutine的数量,不包含被挂起的 goroutine
    r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders	// 原子操作表示当前有写的协程来了
    
    // 在上面这行原子操作执行后,都有来的读goroutine在调用RLock方法时,都会被挂起在readerSem中
    // 因此 r 是所有正在运行读goroutine的数量
    // 上面的原子操作与RLock函数的原子操作形成一对临界区域互斥
    
    // Wait for active readers.
    // 
    //  1. r != 0:存在正在运行的读goroutine
    //  2. atomic.AddInt32(&rw.readerWait, r) != 0:继续判断这段时间呢这些读goroutine是否全部读取完,
    // 没有读取完这里需要把当前写goroutine挂起在writerSem
    // 这里的原子操作与RUlock函数的 "atomic.AddInt32(&rw.readerWait, -1) == 0" 形成临界区域互斥
    if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {	// readerWait记录着正在运行中,还没有调用RUnlock的goroutine
        // 存在需要等待读的协程,把当前协程加入writerSem写信息池
        // false:这里加入的是尾部,由于writerSem只可能存一个写goroutine
        runtime_SemacquireMutex(&rw.writerSem, false, 0)	
    }
    if race.Enabled {
        race.Enable()
        race.Acquire(unsafe.Pointer(&rw.readerSem))
        race.Acquire(unsafe.Pointer(&rw.writerSem))
    }
}

Unlock()

  1. Unlock 方法解除rw的写入锁状态。
  2. 如果 rw 在进入解锁时没有锁定写入,这是一个运行时错误。
  3. 与 Mutexes 一样,一个被锁的 RWMutex 与一个特定的 goroutine 无关。
  4. 一个 goroutine 可以 RLock(锁定)一个 RWMutex,然后安排另一个 goroutine 运行 RUnlock(解锁)它。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// Unlock unlocks rw for writing. It is a run-time error if rw is
// not locked for writing on entry to Unlock.
//
// As with Mutexes, a locked RWMutex is not associated with a particular
// goroutine. One goroutine may RLock (Lock) a RWMutex and then
// arrange for another goroutine to RUnlock (Unlock) it.
func (rw *RWMutex) Unlock() {
    if race.Enabled {
        _ = rw.w.state
        race.Release(unsafe.Pointer(&rw.readerSem))
        race.Disable()
    }
    
    // 1) 当Unlock方法被调用,也就说明数据的相关写操作已经完成了,此时其他来读的goroutine可以正常读取新数据

    // Announce to readers there is no active writer.
    // 
    // 告诉所有RLock的协程,没有正在写的锁,此时来读的协程不必等待直接可以读取到数据
    // 因为程序调用了Unlock方法代表我们前面以把数据更新了,此时在信号量中等待读的协程和此时后面来读的协程都可以安全读取数据了
    // 这里与RLock函数的 "atomic.AddInt32(&rw.readerCount, 1) < 0" 形成临界区互斥
    r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)	// 这里的r全都是等待在信号量的数量
    
    // 没有调用Lock方法,而是直接调用Unlock方法这里会报错
    if r >= rwmutexMaxReaders {
        race.Enable()
        throw("sync: Unlock of unlocked RWMutex")
    }
    
    // Unblock blocked readers, if any.
    // 
    // 释放掉等待在信号量的协程,注意这里是释放完了才把互斥锁解锁的才允许其他写操作进行
    // 因为前面的数据已经更新了,所以这里需要把在信号量中的协程全部放在P本地队列或全局队列中等待调度器调度去来运行
    // 这里也是为什么其他写goroutine获取到Mutex锁时,不存在等待在readerSem上的读goroutine的原因,因为Mutex解锁在后面一步
    for i := 0; i < int(r); i++ {
        runtime_Semrelease(&rw.readerSem, false, 0)	// 取出等待在readerSem的写goroutine
    }
    
    // Allow other writers to proceed.
    // 
    // 允许其他 writers 继续
    rw.w.Unlock()	// sync.Mutex
    if race.Enabled {
        race.Enable()
    }
}

TryLock()

  1. TryLock 试图锁定 rw 进行写入,并报告是否成功。
  2. 请注意,虽然确实存在正确使用 TryLock 的情况,但很少,而且 TryLock 的使用通常表明互斥量的特定使用中存在更深层的问题。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// TryLock tries to lock rw for writing and reports whether it succeeded.
//
// Note that while correct uses of TryLock do exist, they are rare,
// and use of TryLock is often a sign of a deeper problem
// in a particular use of mutexes.
func (rw *RWMutex) TryLock() bool {
    if race.Enabled {
        _ = rw.w.state
        race.Disable()
    }
    
    // 尝试获取Mutex锁
    if !rw.w.TryLock() {
        if race.Enabled {
            race.Enable()
        }
        return false
    }
    
    // 原子交换 readerCount 由 0 -> -rwmutexMaxReaders
    // 可见只有在没有读goroutine的时候,TryLock函数才会返回成功
    if !atomic.CompareAndSwapInt32(&rw.readerCount, 0, -rwmutexMaxReaders) {
        // 存在其他正在读写协程
        rw.w.Unlock()	// 解锁互斥锁
        if race.Enabled {
            race.Enable()
        }
        return false
    }
    
    if race.Enabled {
        race.Enable()
        race.Acquire(unsafe.Pointer(&rw.readerSem))
        race.Acquire(unsafe.Pointer(&rw.writerSem))
    }
    return true
}

RLock()

  1. RLock设置rw读锁。
  2. 它不应该用于递归的读锁定;被阻塞的锁调用会排除新的读取器获取锁。请参阅RWMutex类型的文档。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// RLock locks rw for reading.
//
// It should not be used for recursive read locking; a blocked Lock
// call excludes new readers from acquiring the lock. See the
// documentation on the RWMutex type.
func (rw *RWMutex) RLock() {
    if race.Enabled {
        _ = rw.w.state
        race.Disable()
    }
    // 把rw.readerCount加一,如果该值小于0,说明存在其他goroutine正在写操作,也就是前面的Lock方法
    // 这里也表明了readerCount字段是记录所有写goroutine的数量
    if atomic.AddInt32(&rw.readerCount, 1) < 0 {
        // A writer is pending, wait for it. 
        // 
        // 一个写锁正在继续,等待它完成
        runtime_SemacquireMutex(&rw.readerSem, false, 0)	// 将当前读goroutine挂在readerSem上
    }
    if race.Enabled {
        race.Enable()
        race.Acquire(unsafe.Pointer(&rw.readerSem))
    }
}

RUnlock()

  1. RUnlock 解除一个 RLock 调用。
  2. 它不会影响其他同时阅读的读者。
  3. 如果rw在进入RUnlock时没有锁定读取,则是一个运行时错误。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// RUnlock undoes a single RLock call;
// it does not affect other simultaneous readers.
// It is a run-time error if rw is not locked for reading
// on entry to RUnlock.
func (rw *RWMutex) RUnlock() {
    if race.Enabled {
        _ = rw.w.state
        race.ReleaseMerge(unsafe.Pointer(&rw.writerSem))
        race.Disable()
    }
    // 把前面读加锁减一,如果r小于0,说明正在进行写操作中
    // 这里也有一种可能是没有存在写操作中又没有调用Rlock函数调用了RUnlock函数导致总有一个goroutine这里rw.readerCount=-1
    if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
        // Outlined slow-path to allow the fast-path to be inlined
        // 
        // 有存在写在进行,因此需要判断当前是否需要取出该写goroutine
        // 原因是可能存在读goroutine在运行中,该写goroutine在writerSem中等待
        rw.rUnlockSlow(r)	
    }
    if race.Enabled {
        race.Enable()
    }
}

rUnlockSlow()

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
func (rw *RWMutex) rUnlockSlow(r int32) {
    // 判断没有调用RLock函数调用RUnlock函数报错
    //  1. r+1 == -rwmutexMaxReaders => r + rwmutexMaxReaders == 1
    //  2. r+1 == 0,就是判断上面没有调用RLock函数调用RUnlock函数时情况
    if r+1 == 0 || r+1 == -rwmutexMaxReaders {
        race.Enable()
        throw("sync: RUnlock of unlocked RWMutex")
    }
    
    // A writer is pending.
    // 
    // rw.readerWait 记录着当前正在运行的goroutine没在信号池的数量,这里判断是否已经是最后一个
    // 这里的原子操作和Lock函数中的 "atomic.AddInt32(&rw.readerWait, r) != 0" 形成临界区互斥
    if atomic.AddInt32(&rw.readerWait, -1) == 0 {	// 如果是最后一个,把等待写的goroutine取出
        // The last reader unblocks the writer.
        runtime_Semrelease(&rw.writerSem, false, 1)	// 从writerSem中取出等待在这里的读goroutine
    }
}

TryRLock()

  1. 尝试获取读锁,该方法只要不存在写协程都会获取读锁成功。
  2. TryRLock 试图锁定 rw 以进行读取,并报告是否成功。
  3. 请注意,虽然确实存在对 TryRLock 的正确使用,但很少,而且 TryRLock 的使用通常表明互斥量的特定使用中存在更深层的问题。
  4. 该方法在没有写操作的情况下是一定能拿去到锁的。存在写操作时才会返回 false。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// TryRLock tries to lock rw for reading and reports whether it succeeded.
//
// Note that while correct uses of TryRLock do exist, they are rare,
// and use of TryRLock is often a sign of a deeper problem
// in a particular use of mutexes.
func (rw *RWMutex) TryRLock() bool {
    if race.Enabled {
        _ = rw.w.state
        race.Disable()
    }
    
    // TryRLock 函数获取到读锁只能发生在 readerCount >= 0 状态下
    
    for {
        c := atomic.LoadInt32(&rw.readerCount)	// 原子读取 readerCount
        if c < 0 { // 写操作在进行中或者在等待读完成。
            if race.Enabled {
                race.Enable()
            }
            return false
        }
        
        // 尝试原子交换 readerCount 值
        // 如果交换失败可能有其他读操作或写操作发生,再次循环。
        if atomic.CompareAndSwapInt32(&rw.readerCount, c, c+1) {
            if race.Enabled {
                race.Enable()
                race.Acquire(unsafe.Pointer(&rw.readerSem))
            }
            return true	// 获取到锁后
        }
    }
}

RLocker()

  1. RLocker 返回一个 Locker 接口,通过调用 rw 实现 Lock 和 Unlock 方法。RLock 和 rw.RUnlock。
  2. 意义在于返回接口Locker限制只能调用接口的,比如 sync.Cond 中需要的锁
1
2
3
4
5
// RLocker returns a Locker interface that implements
// the Lock and Unlock methods by calling rw.RLock and rw.RUnlock.
func (rw *RWMutex) RLocker() Locker {
    return (*rlocker)(rw)
}

type Locker interface

  1. Locker接口代表一个可以加锁和解锁的对象。
  2. 该接口定义在sync/mutex.go文件中。
1
2
3
4
type Locker interface {
    Lock()
    Unlock()
}

type rlocker RWMutex

1
type rlocker RWMutex	// sync.RWMutex

Lock()

1
2
3
4
5
6
func (r *rlocker) Lock() { 
    // 因为 rlocker 和 RWMutex是两个类型,虽然底层一样
    // 但是 rlocker 只支持 Lock() 和 Unlock() 方法
    // 需要转换成 (*RWMutex) 才能调用 RLock() 方法。
    (*RWMutex)(r).RLock()
}

Unlock()

1
2
3
func (r *rlocker) Unlock() { 
    (*RWMutex)(r).RUnlock() 
}

使用示例

  • 读写锁:是多读单写互斥锁,分别针对读操作和写操作进行锁定和解锁操作
    • 经常用于读次数远远多于写次数的场合
  • 在Go语言中,读写锁由结构体类型 sync.RWMutex 实现
  • 基本遵守原则:
    • 写锁定情况下,对读写锁定进行读锁定或写锁定,都将阻塞,而且读锁与写锁之间是互斥的
    • 读锁定情况下,对读写锁进行写锁定,将阻塞
      • 加读锁时不会阻塞,即可多读
    • 对未被写锁定的读写锁进行写解锁,会引发运行时异常
    • 对未被读读锁定的读写锁进行读解锁时也会引发运行时异常
    • 写解锁在进行的同时会试图唤醒所有因进行读锁定而被阻塞的协程
    • 读解锁在进行的时候则会试图唤醒一个因进行写锁定而被阻塞的协程
  • 与互斥锁类型,sync.RWMutex 类型的零值就已经是立即可用的读写锁了
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package main

import (
    "fmt"
    "sync"
    "time"
)

var m *sync.RWMutex

func main() {
    wg := sync.WaitGroup{}
    wg.Add(20)

    var rwMutex sync.RWMutex

    Data := 0

    for i := 0; i < 10; i++ {
        go func(t int) {
            rwMutex.RLock()			// 读加锁
            defer rwMutex.RUnlock()	// 读解锁
            fmt.Printf("读数据:%v %d\n", Data, i)
            wg.Done()
            time.Sleep(1 * time.Second)
            // 这句代码第一次运行后,读解锁
            // 循环到第二个时, 读锁定后,这个goroutine就没有阻塞,同时读成功
        }(i)

        go func(t int) {
            rwMutex.Lock()			// 写加锁
            defer rwMutex.Unlock()	// 写解锁
            Data += 1
            fmt.Printf("写数据:%v %d\n", Data, t)
            wg.Done()

            // 对读写锁进行读锁定或者写锁定,都将阻塞
            // 写锁定下是需要解锁后才能写的
            time.Sleep(5 * time.Second)
        }(i)
    }

    wg.Wait()
}
读数据:0 3
写数据:1 2
读数据:1 10
读数据:1 10
读数据:1 10
读数据:1 10
读数据:1 10
读数据:1 10
读数据:1 10
读数据:1 10
读数据:1 10
写数据:2 1
写数据:3 3
写数据:4 4
写数据:5 5
写数据:6 6
写数据:7 0
写数据:8 9
写数据:9 7
写数据:10 8
  • 通过程序运行的输出可以看到,在写锁定情况下,对读写锁进行锁定或者写锁定,都将阻塞
    • 把写数据中的Sleep设置更长时间,在第一次写锁定后,读数据也没有进行
  • 再次写锁定是在 rwMutex.Unlock() 完成后,才能进行 rwMutex.lock()
    • 而读数据时则可以多次读,不一定需要等 rwMutex.RUnlock() 完成