1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
|
// selectgo implements the select statement.
//
// cas0 points to an array of type [ncases]scase, and order0 points to
// an array of type [2*ncases]uint16 where ncases must be <= 65536.
// Both reside on the goroutine's stack (regardless of any escaping in
// selectgo).
//
// For race detector builds, pc0 points to an array of type
// [ncases]uintptr (also on the stack); for other builds, it's set to
// nil.
//
// selectgo returns the index of the chosen scase, which matches the
// ordinal position of its respective select{recv,send,default} call.
// Also, if the chosen scase was a receive operation, it reports whether
// a value was received.
func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, block bool) (int, bool) {
if debugSelect {
print("select: cas0=", cas0, "\n")
}
// NOTE: In order to maintain a lean stack size, the number of scases
// is capped at 65536.
cas1 := (*[1 << 16]scase)(unsafe.Pointer(cas0))
order1 := (*[1 << 17]uint16)(unsafe.Pointer(order0))
// send + recv 数量和
ncases := nsends + nrecvs
// cases 集,按照 send + recv 顺序组成的
scases := cas1[:ncases:ncases] // select case集
// 轮询集,记录的是scases的下标,现在是空的。后面乱序填入
pollorder := order1[:ncases:ncases]
// lockorder 用于按channel地址升序排序,所以方便Lock channel的作用。
lockorder := order1[ncases:][:ncases:ncases] // lock集,现在是空的
// NOTE: pollorder/lockorder's underlying array was not zero-initialized by compiler.
// Even when raceenabled is true, there might be select
// statements in packages compiled without -race (e.g.,
// ensureSigM in runtime/signal_unix.go).
var pcs []uintptr
if raceenabled && pc0 != nil {
pc1 := (*[1 << 16]uintptr)(unsafe.Pointer(pc0))
pcs = pc1[:ncases:ncases]
}
casePC := func(casi int) uintptr {
if pcs == nil {
return 0
}
return pcs[casi]
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
// The compiler rewrites selects that statically have
// only 0 or 1 cases plus default into simpler constructs.
// The only way we can end up with such small sel.ncase
// values here is for a larger select in which most channels
// have been nilled out. The general code handles those
// cases correctly, and they are rare enough not to bother
// optimizing (and needing to test).
// 1) 乱序 pollorder,为后面select的随机性左准备
// generate permuted order
//
// 生成排列的 order。
norder := 0 // 有效的数量
// 随机打乱 scases 并把打乱结果下标存入 pollorder 中。
// 参看下面的【图一】
for i := range scases {
cas := &scases[i]
// Omit cases without channels from the poll and lock orders.
//
// 从 poll 和 lock orders 中省略没有 channels 的情况。
// nil 的 channel 会被丢弃。
if cas.c == nil {
cas.elem = nil // allow GC
continue
}
// 生成随机数 [0, norder + 1]
j := fastrandn(uint32(norder + 1))
pollorder[norder] = pollorder[j]
pollorder[j] = uint16(i)
norder++
}
pollorder = pollorder[:norder] // 此时pollorder是打乱的轮询集
lockorder = lockorder[:norder]
// 2) lockorder 按 channel 地址排序,为了后面 all lock 准备
// 按照地址升序排序有助于判断同一个channel在多个 case 中的情况
// sort the cases by Hchan address to get the locking order.
// simple heap sort, to guarantee n log n time and constant stack footprint.
//
// 将cases按照Hchan地址排序得到 locking order。
// 简单的堆排序,保证 n log n 时间和恒定的栈占用。
// 参看下面的【图一】
for i := range lockorder { // 按照channel地址排序并记录在lockorder中升序。
j := i
// Start with the pollorder to permute cases on the same channel.
c := scases[pollorder[i]].c
// 按照 channel 的地址排序
for j > 0 && scases[lockorder[(j-1)/2]].c.sortkey() < c.sortkey() {
k := (j - 1) / 2
lockorder[j] = lockorder[k]
j = k
}
lockorder[j] = pollorder[i]
}
for i := len(lockorder) - 1; i >= 0; i-- {
o := lockorder[i]
c := scases[o].c
lockorder[i] = lockorder[0]
j := 0
for {
k := j*2 + 1
if k >= i {
break
}
if k+1 < i && scases[lockorder[k]].c.sortkey() < scases[lockorder[k+1]].c.sortkey() {
k++
}
if c.sortkey() < scases[lockorder[k]].c.sortkey() {
lockorder[j] = lockorder[k]
j = k
continue
}
break
}
lockorder[j] = o
}
if debugSelect {
for i := 0; i+1 < len(lockorder); i++ {
if scases[lockorder[i]].c.sortkey() > scases[lockorder[i+1]].c.sortkey() {
print("i=", i, " x=", lockorder[i], " y=", lockorder[i+1], "\n")
throw("select: broken sort")
}
}
}
// 3) 所有的 send、recv 获取 lock
// lock all the channels involved in the select
//
// 锁定select中涉及的所有channels。
sellock(scases, lockorder) // lock channel
var (
gp *g // 找到的goroutine
sg *sudog
c *hchan // channel
k *scase
sglist *sudog
sgnext *sudog
qp unsafe.Pointer
nextp **sudog
)
// pass 1 - look for something already waiting
// pass 1 - 寻找已经在等待的
var casi int
var cas *scase
var caseSuccess bool
var caseReleaseTime int64 = -1
var recvOK bool
// 轮序 order
for _, casei := range pollorder {
casi = int(casei) // 选中下标
cas = &scases[casi]
c = cas.c // channel
// recv 操作
if casi >= nsends { // 注意 recv 先判断的能否成功,再判断的 close
sg = c.sendq.dequeue() // sendq中寻找
if sg != nil {
goto recv // 找到,去recv
}
// send buf 中有数据,去bufrecv
if c.qcount > 0 {
goto bufrecv
}
// 当前recv操作没完成,close关闭了,去rclose
if c.closed != 0 {
goto rclose
}
} else { // send 操作。注意 send 先判断的 close,再判断能否成功
if raceenabled {
racereadpc(c.raceaddr(), casePC(casi), chansendpc)
}
// channel 已经关闭
if c.closed != 0 {
goto sclose
}
sg = c.recvq.dequeue() // recvq中寻找
if sg != nil {
goto send
}
// buf 中还有容量
if c.qcount < c.dataqsiz {
goto bufsend
}
}
}
// 4) 以上都没有能立即完成时。可以走default分支不?
// block 为false时存在default分支,不想阻塞。
// block 为true时不存在default分支,阻塞。
if !block {
selunlock(scases, lockorder) // all channel unlock
casi = -1 // -1 没找到
goto retc
}
// 5) 把当前 goroutine 挂在所有 channel 等待。
// pass 2 - enqueue on all chans
// pass 2 - 对所有chan进行排队
gp = getg() // g
if gp.waiting != nil {
throw("gp.waiting != nil")
}
nextp = &gp.waiting
// 遍历 lockorder,这里当前goroutine被加入到多个channel中
for _, casei := range lockorder {
casi = int(casei)
cas = &scases[casi]
c = cas.c // channel
sg := acquireSudog() // 寻找一个*sudog
sg.g = gp // 记录当前goroutine
sg.isSelect = true // 标记是在select
// No stack splits between assigning elem and enqueuing
// sg on gp.waiting where copystack can find it.
sg.elem = cas.elem
sg.releasetime = 0
if t0 != 0 {
sg.releasetime = -1
}
sg.c = c
// Construct waiting list in lock order.
// 所有的 sudog 组成个链表,有两个作用:【参看下面图片】
// 1. 后续selparkcommit函数通过sudog.c解锁所有的 channel。
// 2. 就绪后用于判断是那个 case 就绪了。
*nextp = sg
// 通过 waitlink 链接
nextp = &sg.waitlink
// 加入队列中
if casi < nsends {
c.sendq.enqueue(sg)
} else {
c.recvq.enqueue(sg)
}
}
// wait for someone to wake us up
// 等着有人叫醒我们
gp.param = nil // 在被唤醒时会给param参数上赋值
// Signal to anyone trying to shrink our stack that we're about
// to park on a channel. The window between when this G's status
// changes and when we set gp.activeStackChans is not safe for
// stack shrinking.
// 参看 channel 文档
atomic.Store8(&gp.parkingOnChan, 1) // parkingOnChan = 1
// 挂起,进入调度循环。
gopark(selparkcommit, nil, waitReasonSelect, traceEvGoBlockSelect, 1)
// 6) 再次被唤醒时,... ...
gp.activeStackChans = false
// 全部channels锁住
sellock(scases, lockorder) // all channel lock
// 标记为0,表示select已完成。此时所有的channels都锁住了。
gp.selectDone = 0
// 被唤醒的goroutine的*sudog放在param上
// 直接获取使用,关于 gp.param 的部分赋值代码在 chan send和recv上
sg = (*sudog)(gp.param)
gp.param = nil
// pass 3 - dequeue from unsuccessful chans
// otherwise they stack up on quiet channels
// record the successful case, if any.
// We singly-linked up the SudoGs in lock order.
casi = -1
cas = nil
caseSuccess = false
// 在 waiting 上等待着很多goroutine
sglist = gp.waiting
// Clear all elem before unlinking from gp.waiting.
// 在从gp.waiting断开连接前清除所有elem。
for sg1 := gp.waiting; sg1 != nil; sg1 = sg1.waitlink {
sg1.isSelect = false
sg1.elem = nil
sg1.c = nil
}
gp.waiting = nil
// 7) 遍历 lockorder,寻找是哪个case就绪了
for _, casei := range lockorder {
k = &scases[casei]
if sg == sglist { // 找到就绪的 case
// sg has already been dequeued by the G that woke us up.
casi = int(casei) // 找到就绪的 case
cas = k
caseSuccess = sglist.success
if sglist.releasetime > 0 {
caseReleaseTime = sglist.releasetime
}
} else { // 移除没就绪的
c = k.c
// 从 channel 中移除 sglist
// 因为其他 channel 中还挂起的呢,要移除
if int(casei) < nsends {
c.sendq.dequeueSudoG(sglist)
} else {
c.recvq.dequeueSudoG(sglist)
}
}
sgnext = sglist.waitlink // 换下一个
sglist.waitlink = nil
releaseSudog(sglist) // 回收*sudog
sglist = sgnext
}
if cas == nil {
throw("selectgo: bad wakeup")
}
c = cas.c // channel
if debugSelect {
print("wait-return: cas0=", cas0, " c=", c, " cas=", cas, " send=", casi < nsends, "\n")
}
// send 操作
if casi < nsends {
// caseSuccess = true成功,false由于closed函数关闭触发。
if !caseSuccess {
goto sclose
}
} else { // recv 操作
recvOK = caseSuccess
}
if raceenabled {
if casi < nsends {
raceReadObjectPC(c.elemtype, cas.elem, casePC(casi), chansendpc)
} else if cas.elem != nil {
raceWriteObjectPC(c.elemtype, cas.elem, casePC(casi), chanrecvpc)
}
}
if msanenabled {
if casi < nsends {
msanread(cas.elem, c.elemtype.size)
} else if cas.elem != nil {
msanwrite(cas.elem, c.elemtype.size)
}
}
if asanenabled {
if casi < nsends {
asanread(cas.elem, c.elemtype.size)
} else if cas.elem != nil {
asanwrite(cas.elem, c.elemtype.size)
}
}
selunlock(scases, lockorder) // all channel unlock
goto retc
bufrecv: // buf 中有数据, recv操作【ep <- c】
// can receive from buffer
if raceenabled {
if cas.elem != nil {
raceWriteObjectPC(c.elemtype, cas.elem, casePC(casi), chanrecvpc)
}
racenotify(c, c.recvx, nil)
}
if msanenabled && cas.elem != nil {
msanwrite(cas.elem, c.elemtype.size)
}
if asanenabled && cas.elem != nil {
asanwrite(cas.elem, c.elemtype.size)
}
recvOK = true // 数据交换成功标识
qp = chanbuf(c, c.recvx)
if cas.elem != nil {
// 数据给到等待的变量
typedmemmove(c.elemtype, cas.elem, qp)
}
typedmemclr(c.elemtype, qp)
// 处理 buf 的下标
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
selunlock(scases, lockorder) // all channel unlock
goto retc
bufsend: // buf 中还有容量,send操作【c <- ep】
// can send to buffer
if raceenabled {
racenotify(c, c.sendx, nil)
raceReadObjectPC(c.elemtype, cas.elem, casePC(casi), chansendpc)
}
if msanenabled {
msanread(cas.elem, c.elemtype.size)
}
if asanenabled {
asanread(cas.elem, c.elemtype.size)
}
// 数据迁移
typedmemmove(c.elemtype, chanbuf(c, c.sendx), cas.elem)
// 处理下标
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
selunlock(scases, lockorder)
goto retc
recv: // sendq 中获取到 sg goroutine
// can receive from sleeping sender (sg)
// 数据在挂起的 send 的 goroutine 里面,调用recv取交换数据。
recv(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2) // 恢复并解锁all channel
if debugSelect {
print("syncrecv: cas0=", cas0, " c=", c, "\n")
}
recvOK = true // recv操作成功
goto retc
rclose: // channel 已经关闭, recv 操作时
// read at end of closed channel
selunlock(scases, lockorder) // all channel unlock
recvOK = false // channel 关闭的 recv
if cas.elem != nil {
// 接受值设置成默认零值,因为close了
typedmemclr(c.elemtype, cas.elem)
}
if raceenabled {
raceacquire(c.raceaddr())
}
goto retc
send: // recvq 中有等待 sg goroutine。
// can send to a sleeping receiver (sg)
if raceenabled {
raceReadObjectPC(c.elemtype, cas.elem, casePC(casi), chansendpc)
}
if msanenabled {
msanread(cas.elem, c.elemtype.size)
}
if asanenabled {
asanread(cas.elem, c.elemtype.size)
}
// 数据在挂起的 recv 的 goroutine 里面,调用 send 去交换数据。
send(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
if debugSelect {
print("syncsend: cas0=", cas0, " c=", c, "\n")
}
goto retc
retc:
if caseReleaseTime > 0 {
blockevent(caseReleaseTime-t0, 1)
}
return casi, recvOK
sclose: // channel 已经关闭,send 操作时
// send on closed channel
selunlock(scases, lockorder)
panic(plainError("send on closed channel"))
}
|