type Pool struct#
- Pool 是一组可以单独保存和检索的临时对象。
- 储存在 Pool 中的任何物品都可以在任何时间自动移除,无需通知。如果在此发生时 Pool 持有唯一的引用,则可能会释放该元素。
- Pool 可以被多个goroutines同时使用。
- Pool's 的目的是缓存已分配但未使用的项,以便以后重用,减轻垃圾收集器的压力。也就是说,它使构建高效的、线程安全的空闲列表变得容易。但它并不适用于所有空闲链表。
- Pool 的适当使用是管理一组在包的并发独立客户端之间共享和可能被重用的临时项。Pool 提供了一种在多个客户端之间摊销分配开销的方法。
- 一个良好使用 Pool 的例子是fmt包,它维护了一个动态大小的临时输出缓冲区存储。store在负载下扩展(当许多goroutines正在积极打印时),在静默时收缩。
- 另一方面,作为生存期较短的对象的一部分维护的空闲列表不适合用于 Pool,因为在这种情况下开销不能很好地分摊。
- 在第一次使用后,不能复制池。
- sync.Pool 是协程安全的,使用前,设置好对象的 New 函数,用在 Pool 里没有缓存的对象时,创建一个。之后在程序的任何地方、任何时候仅通过 Get() 和 Put() 方法就可以取和还对象了
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
|
// A Pool is a set of temporary objects that may be individually saved and
// retrieved.
//
// Any item stored in the Pool may be removed automatically at any time without
// notification. If the Pool holds the only reference when this happens, the
// item might be deallocated.
//
// A Pool is safe for use by multiple goroutines simultaneously.
//
// Pool's purpose is to cache allocated but unused items for later reuse,
// relieving pressure on the garbage collector. That is, it makes it easy to
// build efficient, thread-safe free lists. However, it is not suitable for all
// free lists.
//
// An appropriate use of a Pool is to manage a group of temporary items
// silently shared among and potentially reused by concurrent independent
// clients of a package. Pool provides a way to amortize allocation overhead
// across many clients.
//
// An example of good use of a Pool is in the fmt package, which maintains a
// dynamically-sized store of temporary output buffers. The store scales under
// load (when many goroutines are actively printing) and shrinks when
// quiescent.
//
// On the other hand, a free list maintained as part of a short-lived object is
// not a suitable use for a Pool, since the overhead does not amortize well in
// that scenario. It is more efficient to have such objects implement their own
// free list.
//
// A Pool must not be copied after first use.
type Pool struct {
// 使得内嵌了noCopy的对象在进行go vet静态检查的时候,可以检查出是否被复制
noCopy noCopy
// 访问时根据P的id去访问对应下标的local[pid]
// 通过这样的设计,多个goroutine使用同一个Pool时,减少了竞争,提升了性能
// local字段指向存储[P]poolloacl数组的指针,类型为[P]poolLocal
//
// local 是 [P]poolLocal 数组的首地址
// P 是当前P的数量,一般默认为CPU的核数
local unsafe.Pointer // local fixed-size per-P pool, actual type is [P]poolLocal
// localSize 是上面 local 数组的大小
// 根据 localSize 判断是否初始化,pid为P的id改值是一个0的递增值
// 1. localSize 为0时,没有初始化
// 2. localSize <= pid 时,可能是没有初始化,也可能是P的数量发生了变化,变多了
localSize uintptr // size of the local array
// victim 和 victimSize 作为次级缓存使用,GC时将对象放入其中,下一次GC来临之前如果有Get调用则会从p.victim中取,直到再一次GC来时回收
// 从 p.victim 中取出对象使用完毕之后并未返回 p.victim 中(而是放回p.local)中,在一定程度上也减小了下一次GC的开销
// 原来1次GC的开销被拉长到2次切会有一定程度的开销减小,这就是 p.victim 引入的意图
// victim 和 victimSize 会在一轮GC到来时,分别"接管" local 和 localSize
// victim 的机制用于减少GC后冷启动导致的性能抖动,让分配对象更平滑
// sync.Pool 引入的意图在于降低GC压力的同时提高命中率
victim unsafe.Pointer // local from previous cycle 来自上一个周期的local
victimSize uintptr // size of victims array 来自上一个周期的local的大小
// New optionally specifies a function to generate
// a value when Get would otherwise return nil.
// It may not be changed concurrently with calls to Get.
//
// New可选地指定一个函数,用于在Get返回nil时生成一个值。
// 它不能在调用Get时同时改变。
New func() any // 我们指定的新建对象的方法
}
// > -----------------------------------------------------------------------------------
type poolLocal struct {
poolLocalInternal // 32 bytes
// Prevents false sharing on widespread platforms with
// 128 mod (cache line size) = 0 .
//
// 将poolLocal补齐至128字节(即两个cache line)的倍数,防止false sharing伪共享
// 仅占位用,防止在cache line上分配多个 poolLocalInternal
// 确保CPU缓存机制不同,一般建议确保有128字节距离
pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte // 使poolLocal总共占128字节
// CPU Cache
// 现代cpu中,cache都划分成以cache line(cache block)为单位,在x86_64体系下一般都是64字节,cache line是操作的最小单元
// 程序即使只想读内存中的1个字节数据,也要同时把附近63节字加载到cache中,如果读取超个64字节,那么就要加载到多个cache line中
// 这样,访问后续63字节数据时就可以直接从cache line中读取,性能有很大提升
// false sharing
// 伪共享的非标准定义为:
// 缓存系统中是以缓存行(cache line)为单位存储的,当多线程修改互相独立的变量时,如果这些变量共享同一个缓存行,
// 就会令整个 cache line 失效,无意中影响彼此的性能,这就是伪共享
// 如果没用pad字段时,那么当需要访问0号索引的poolLocal时,CPU同时会把0号和1号索引同时加载到cpu cache,在只修改0号索引的情况下,
// 会让1号索引的poolLocal失效。这样,当其他线程想要读取1号索引时,发生cache miss,还得重新再加载,对性能有损,
// 增加一个pad,补齐缓存行,让相关的字段能独立地加载到缓存行就不会出现false sharding了
}
// > -----------------------------------------------------------------------------------
// Local per-P Pool appendix.
type poolLocalInternal struct {
// private只有当前P能用
private any // Can be used only by the respective P.
// 其他P都可以用,当private没有时优先去当前P的local.shared中取,如果还没有就去其他P中的local.shared中窃取一个来用
shared poolChain // Local P can pushHead/popHead; any P can popTail.
}
|
Variables#
- 全局变量
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
var (
allPoolsMu Mutex // 全局互斥锁
// allPools is the set of pools that have non-empty primary
// caches. Protected by either 1) allPoolsMu and pinning or 2)
// STW.
//
// allPools 是具有非空主键缓存的 pool 集合
// 受任何一方保护 1) allPoolsMu 和 pinning 2) STW
// 在Get函数中,初始化时被保存在这里
allPools []*Pool // 保存来自用户创建的Pool实例,用户端可能创建多个Pool,比如fmt包创建的Pool也会保存在这里
// oldPools is the set of pools that may have non-empty victim
// caches. Protected by STW.
//
// oldPools是一组可能具有非空victim caches的池。受STW保护。
// 在GC开始时,保存allPools中的值
oldPools []*Pool // oldPools只是保存了 allPools 的值,可见是防止被GC回收相关数据
)
|
type poolLocal struct#
- 本地 Pool,对齐Cache line的倍数。
1
2
3
4
5
6
7
8
9
|
type poolLocal struct {
poolLocalInternal
// Prevents false sharing on widespread platforms with
// 128 mod (cache line size) = 0 .
//
// 用 128 mod (cache line size) = 0 防止在广泛传播的平台上 false sharing。
pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}
|
type poolLocalInternal struct#
1
2
3
4
5
6
7
8
9
|
// Local per-P Pool appendix.
type poolLocalInternal struct {
// private 私有的,只能由相应的P使用。
private any // Can be used only by the respective P.
// shared 共享的,local P 可以 pushHead/popHead; 任何P可以 popTail
// 当当前P的private没有,那么优先从当前P的shared中取,还没有则从其他P的shared中取,
// 还是没有如果New函数存在则使用该函数生成
shared poolChain // Local P can pushHead/popHead; any P can popTail.
}
|
type poolChain struct#
- poolChain是poolDequeue的动态版本。
- 参看 poolqueue.go 文档。(第二篇中介绍)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
// poolChain is a dynamically-sized version of poolDequeue.
//
// This is implemented as a doubly-linked list queue of poolDequeues
// where each dequeue is double the size of the previous one. Once a
// dequeue fills up, this allocates a new one and only ever pushes to
// the latest dequeue. Pops happen from the other end of the list and
// once a dequeue is exhausted, it gets removed from the list.
type poolChain struct {
// head is the poolDequeue to push to. This is only accessed
// by the producer, so doesn't need to be synchronized.
head *poolChainElt
// tail is the poolDequeue to popTail from. This is accessed
// by consumers, so reads and writes must be atomic.
tail *poolChainElt
}
|
Pool Methods#
Get()#
- 优先从当前 P 的 local.private 中取,没有则从当前 P 的 local.shared 中取,还没有则去其他 P 中 local.shared 中窃取一个
- 调用者不应该认为
Get
的返回值和传递给Put
值之间有任何关系
- 假如
Get
方法没有取得 item
,如 p.New
非 nil
,Get返回调用 p.New
的结果;否则返回nil
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
|
// Get selects an arbitrary item from the Pool, removes it from the
// Pool, and returns it to the caller.
// Get may choose to ignore the pool and treat it as empty.
// Callers should not assume any relation between values passed to Put and
// the values returned by Get.
//
// If Get would otherwise return nil and p.New is non-nil, Get returns
// the result of calling p.New.
func (p *Pool) Get() any {
if race.Enabled {
race.Disable()
}
l, pid := p.pin() // 返回当前工作线程所在的*poolLocal和pid
x := l.private // 取当前private上数据
l.private = nil // 并清零private
// 如果 local private 没有
if x == nil {
// Try to pop the head of the local shard. We prefer
// the head over the tail for temporal locality of
// reuse.
//
// 尝试从local shard的head取出。对于重复使用一时的locality我们更喜欢head而不是tail
x, _ = l.shared.popHead() // 尝试从shared的head弹出一个数据
if x == nil {
x = p.getSlow(pid) // 如果上面还未空,则去其他P中偷取,或从victim cache去拿去
}
}
runtime_procUnpin() // 允许当前工作线程被抢占
if race.Enabled {
race.Enable()
if x != nil {
race.Acquire(poolRaceAddr(x))
}
}
// 如果上面都没有拿到数据并且又定义了New方法调用该方法创建数据
if x == nil && p.New != nil {
x = p.New()
}
return x
}
|
pin()#
- pin 将当前 goroutine 固定到 P,禁用抢占并为 P 和 P 的id返回 poolLocal池。
- 调用者在处理池时必须调用runtime_procUnpin()。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
|
// pin pins the current goroutine to P, disables preemption and
// returns poolLocal pool for the P and the P's id.
// Caller must call runtime_procUnpin() when done with the pool.
func (p *Pool) pin() (*poolLocal, int) {
// 该函数主要作用是加锁M禁止当前M被抢占,然后返回M正绑定的P的id
pid := runtime_procPin()
// In pinSlow we store to local and then to localSize, here we load in opposite order.
// Since we've disabled preemption, GC cannot happen in between.
// Thus here we must observe local at least as large localSize.
// We can observe a newer/larger local, it is fine (we must observe its zero-initialized-ness).
//
// 在pinSlow中,我们存储到local,然后存储到localSize,在这里我们按相反的顺序加载。
// 因为我们已经禁用了抢占,所以GC不能在这两者之间发生。
// 因此,这里我们必须注意local至少为large localSize。
// 我们可以观察到一个 更新/更大 的 local,这是没问题的(我们必须观察到它的零初始化)。
s := runtime_LoadAcquintptr(&p.localSize) // load-acquire; 原子读取p.localSize值
l := p.local // load-consume; 存储数据的数组地址
// uintptr(pid) >= s; 可能 1)没有初始化过 2)P的数量变多了
if uintptr(pid) < s {
return indexLocal(l, pid), pid
}
return p.pinSlow() // 初始化去
}
// > ---------------------------------------------------------------------------------
// go1.19.3/src/runtime/proc.go
//go:linkname sync_runtime_procPin sync.runtime_procPin
//go:nosplit
func sync_runtime_procPin() int {
return procPin()
}
// > ---------------------------------------------------------------------------------
// go1.19.3/src/runtime/proc.go
//go:nosplit
func procPin() int {
_g_ := getg()
mp := _g_.m
mp.locks++
return int(mp.p.ptr().id)
}
// > ---------------------------------------------------------------------------------
// go1.19.3/src/runtime/proc.go
//go:linkname sync_runtime_procUnpin sync.runtime_procUnpin
//go:nosplit
func sync_runtime_procUnpin() {
procUnpin()
}
// > ---------------------------------------------------------------------------------
// go1.19.3/src/runtime/proc.go
//go:nosplit
func procUnpin() {
_g_ := getg()
_g_.m.locks--
}
// > ---------------------------------------------------------------------------------
// go1.19.3/src/sync/pool.go
//go:linkname runtime_LoadAcquintptr runtime/internal/atomic.LoadAcquintptr
func runtime_LoadAcquintptr(ptr *uintptr) uintptr
// > ---------------------------------------------------------------------------------
// go1.19.3/src/runtime/proc.go
// l是p.local,i是pid
func indexLocal(l unsafe.Pointer, i int) *poolLocal {
// 获取到i下标的数据地址
lp := unsafe.Pointer(uintptr(l) + uintptr(i)*unsafe.Sizeof(poolLocal{}))
return (*poolLocal)(lp)
}
|
pinSlow()#
- pinSlow 主要是完成 pool.local 的初始化创建
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
func (p *Pool) pinSlow() (*poolLocal, int) {
// Retry under the mutex.
// Can not lock the mutex while pinned.
runtime_procUnpin()
allPoolsMu.Lock()
defer allPoolsMu.Unlock()
pid := runtime_procPin()
// poolCleanup won't be called while we are pinned.
s := p.localSize
l := p.local
// 再次检查的意义在于,可能出现此时Pool已经被初始化
if uintptr(pid) < s {
return indexLocal(l, pid), pid
}
if p.local == nil {
// 新初始化的Pool记录到allPools
allPools = append(allPools, p) // allPools是存储[]*Pool切片
}
// If GOMAXPROCS changes between GCs, we re-allocate the array and lose the old one.
// 如果 GOMAXPROCS 在 GC 之间发生变化,我们将重新分配数组并丢失旧数组
// runtime.GOMAXPROCS 函数 参数是0或原大小值直接返回CPU中数量,其他则修改P的数量
size := runtime.GOMAXPROCS(0) // 返回P的总数量
local := make([]poolLocal, size) // 创建poolLocal类型切片,长度和容量都为size
atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // store-release
runtime_StoreReluintptr(&p.localSize, uintptr(size)) // store-release
return &local[pid], pid
}
// > ---------------------------------------------------------------------------------
//go:linkname runtime_StoreReluintptr runtime/internal/atomic.StoreReluintptr
func runtime_StoreReluintptr(ptr *uintptr, val uintptr) uintptr
|
getSlow()#
- 从其他 P 的 share 中去偷取元素
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
|
func (p *Pool) getSlow(pid int) any {
// See the comment in pin regarding ordering of the loads.
// 请参阅pin中关于负载排序的注释。
// 原子读取 p.localSize
size := runtime_LoadAcquintptr(&p.localSize) // load-acquire
locals := p.local // load-consume
// Try to steal one element from other procs.
// 尝试从其他进程中窃取一个元素。
for i := 0; i < int(size); i++ {
// 偷取顺序从当前P的下一个P开始遍历一圈
l := indexLocal(locals, (pid+i+1)%int(size))
if x, _ := l.shared.popTail(); x != nil {
return x
}
}
// Try the victim cache. We do this after attempting to steal
// from all primary caches because we want objects in the
// victim cache to age out if at all possible.
//
// 试试 victim cache。我们试图从所有primary caches中窃取数据后才这样做,
// 因为我们希望victim cache中的对象尽可能的过期
size = atomic.LoadUintptr(&p.victimSize) // 原子读取victimSize
if uintptr(pid) >= size {
return nil
}
locals = p.victim
l := indexLocal(locals, pid) // 取出pid对应Pool
// 先从 private 中取
if x := l.private; x != nil {
l.private = nil
return x
}
// 从其他P的 share 中取
for i := 0; i < int(size); i++ {
l := indexLocal(locals, (pid+i)%int(size))
if x, _ := l.shared.popTail(); x != nil {
return x
}
}
// Mark the victim cache as empty for future gets don't bother
// with it.
//
// 将 victim cache 标记为空,以便将来获取,不要费心处理它。
atomic.StoreUintptr(&p.victimSize, 0)
return nil
}
|
indexLocal()#
1
2
3
4
|
func indexLocal(l unsafe.Pointer, i int) *poolLocal {
lp := unsafe.Pointer(uintptr(l) + uintptr(i)*unsafe.Sizeof(poolLocal{}))
return (*poolLocal)(lp)
}
|
Put()#
Put
方法将x
放入 pool 中
- 把 x 放入池子中时,建议清除上面相关数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
// Put adds x to the pool.
func (p *Pool) Put(x any) {
if x == nil {
return
}
if race.Enabled {
if fastrandn(4) == 0 {
// Randomly drop x on floor.
return
}
race.ReleaseMerge(poolRaceAddr(x))
race.Disable()
}
// 获取 P 对应的 *poolLocal
l, _ := p.pin()
// 如果当前private为nil则把这个存储在这里,等待下次优先被使用
if l.private == nil {
l.private = x
x = nil
}
if x != nil { // 这种情况是private已经有数据了,则放入shared队列中
l.shared.pushHead(x)
}
runtime_procUnpin() // 允许当前工作线程被抢占,原因是pin函数里面加了锁的这里需要解锁
if race.Enabled {
race.Enable()
}
}
|
init()#
- 注册 poolCleanup 函数,在GC开始时调用
- 对于 Pool 而言,并不能无限扩展,否则对象占用内存太多会引起内存溢出(几乎所有的池技术中都会在某个时刻清空或清除部分缓存对象。Go发生在GC时清除部分内存)
- 在 pool.go 文件的 init 函数里,注册GC发生时,如何清理 Pool 的函数 poolCleanup
1
2
3
4
5
6
7
8
9
10
11
12
|
func init() {
runtime_registerPoolCleanup(poolCleanup)
}
// go1.19.3/src/runtime/mgc.go
//go:linkname sync_runtime_registerPoolCleanup sync.runtime_registerPoolCleanup
func sync_runtime_registerPoolCleanup(f func()) {
poolcleanup = f
}
// go1.19.3/src/runtime/mgc.go
var poolcleanup func()
|
使用示例#
Pool
:是一个可以分别存取的临时对象的集合
Pool
:中保存的任何 item
都可能随时不做通告的释放掉
- 如果
Pool
持有该对象的唯一引用,这个 item
就可能被回收
Pool
:可以安全的被多个线程同时使用
Pool
:的目的是缓存申请但未使用 item
用于之后的重用,已减轻GC
的压力
- 也就是说,让创建高效而线程安全的空闲列表更容易
- 但
Pool
并不适合用于多有空闲列表
Pool
:的合理用法是用于管理一组静静的被多个独立并发线程共享并可能重用的临时 item
Pool
:的一个好例子在fmt
包里面
- 该
Pool
维护一个动态大小的临时输出缓存仓库
- 该创库会在过载(许多线程活跃的打印时)增大,在沉寂时缩小
- 另一方面,管理这短寿命对象的空闲列表不适合使用
Pool
- 因为这种情况下内存申请消耗不能很好的分配
- 这时应该由这些对象自己实现空闲列表
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
package main
import (
"fmt"
"sync"
)
var pool *sync.Pool
type Person struct {
Name string
}
func initPool() {
pool = &sync.Pool {
New: func() interface{} {
fmt.Println("Creating a new Person")
return new(Person)
},
}
}
func main() {
initPool()
// pool.Get() 返回interface{}
// 然后断言 是否为 *Person
p := pool.Get().(*Person)
fmt.Println("首次从 pool 里获取:", p)
p.Name = "first"
fmt.Printf("设置 p.Name = %s\n", p.Name)
// 将p放回池中
pool.Put(p)
fmt.Println("Pool 里已有一个对象:&{first},调用 Get:", pool.Get().(*Person))
fmt.Println("Pool 没有对象了,调用 Get:", pool.Get().(*Person))
}
|
Creating a new Person
首次从 pool 里获取: &{}
设置 p.Name = first
Pool 里已有一个对象:&{first},调用 Get: &{first}
Creating a new Person
Pool 没有对象了,调用 Get: &{}
- 首先,需要初始化 Pool,唯一需要的就是设置好New函数
- 当调用 Get 方法时,如果池子里缓存了对象,就直接返回缓存的对象
- 如果没有存货,则调用New函数创建一个新的对象
- 另外,我们发现Get方法取出来的对象和上次Put进去的对象实际上是同一个,Pool没有做任何“清空”的处理
- 但我们不应当对此有任何假设,因为在实际的并发使用场景中
- 无法保证这种顺序,最好的做法是在Put前,将对象清空