1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
|
// chanrecv receives on channel c and writes the received data to ep.
// ep may be nil, in which case received data is ignored.
// If block == false and no elements are available, returns (false, false).
// Otherwise, if c is closed, zeros *ep and returns (true, false).
// Otherwise, fills in *ep with an element and returns (true, true).
// A non-nil ep must point to the heap or the caller's stack.
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// raceenabled: don't need to check ep, as it is always on the stack
// or is new memory allocated by reflect.
//
// Raceenabled:不需要检查ep,因为它总是在堆栈上或由reflect分配新的内存。
if debugChan { // debug
print("chanrecv: chan=", c, "\n")
}
// 1) chan未初始化时,比如 var c chan int
if c == nil {
if !block { // 来自select{case: default:}块
return // false, false
}
// <- nil
// 这里在nil的chan中读取数据,直接切换到调度循环进行新一轮调度
// 这个G后面的代码将不会得到执行,应该当前G既没有加入到P中等待调度,也没有在chan中,永久丢失了
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
// 2) Fast path:在未加锁下,判断block为false时,send为空时。
// Fast path: check for failed non-blocking operation without acquiring the lock.
//
// Fast path: 检查未获得锁的失败的非阻塞操作。
if !block && empty(c) { // select{case: default:}块,c为空
// After observing that the channel is not ready for receiving, we observe whether the
// channel is closed.
//
// Reordering of these checks could lead to incorrect behavior when racing with a close.
// For example, if the channel was open and not empty, was closed, and then drained,
// reordered reads could incorrectly indicate "open and empty". To prevent reordering,
// we use atomic loads for both checks, and rely on emptying and closing to happen in
// separate critical sections under the same lock. This assumption fails when closing
// an unbuffered channel with a blocked send, but that is an error condition anyway.
//
// 在观察到通道还没有准备好接收之后,我们观察通道是否关闭。
//
// 在与close竞争时,这些检查的重新排序可能会导致错误的行为。
// 例如,如果channel是打开的且不是空的,被关闭,然后全部取出,重新排序的读数可能会错误地指示"open and empty"。
// 为了防止重新排序,我们对这两种检查都使用了原子加载,并依赖于在同一锁下的不同临界区中进行清空和关闭操作。
// 当以阻塞的发送方式关闭无缓冲的通道时,这个假设就失败了,但无论如何这都是一个错误条件。
if atomic.Load(&c.closed) == 0 { // send为空 并 closed 未关闭,返回 (false, false)
// Because a channel cannot be reopened, the later observation of the channel
// being not closed implies that it was also not closed at the moment of the
// first observation. We behave as if we observed the channel at that moment
// and report that the receive cannot proceed.
//
// 因为channel不能重新打开,所以后来观察到的channel没有关闭意味着在第一次观察的时候它也没有关闭。
// 我们的行为就像我们当时观察到了channel,并报告说接收无法继续。
return
}
// The channel is irreversibly closed. Re-check whether the channel has any pending data
// to receive, which could have arrived between the empty and closed checks above.
// Sequential consistency is also required here, when racing with such a send.
//
// channel是不可逆关闭的。重新检查信道是否有待处理的数据要接收,这些数据可能是在上面的空检查和关闭检查之间到达的。
// 当使用这样的发送方式比赛时,顺序的一致性也是必需的。
if empty(c) { // send为空 并且 closed 已关闭,返回 (true, false)
// The channel is irreversibly closed and empty.
if raceenabled {
raceacquire(c.raceaddr())
}
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
// 3) 加锁情况下去判断 send 和 sendq 是否有等待 send 的数据。
lock(&c.lock) // 获取runtime.mutex
// channel 已关闭
if c.closed != 0 {
// send buf缓冲区没数据
if c.qcount == 0 {
if raceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock)
// 拷贝对应类型的零值
if ep != nil {
// Typedmemclr清除类型为typ的ptr的类型化内存。
typedmemclr(c.elemtype, ep) // ep 赋值chan元素类型的默认值
}
return true, false
}
// The channel has been closed, but the channel's buffer have data.
//
// channel 已经关闭,但通道的缓冲区有数据。
} else { // channel 未关闭
// Just found waiting sender with not closed.
//
// 刚刚发现等待send未关闭。取出first上的第一个*sudog,这个需要处理
// 这里隐含了 buf 缓存区为空的情况。
if sg := c.sendq.dequeue(); sg != nil {
// Found a waiting sender. If buffer is size 0, receive value
// directly from sender. Otherwise, receive from head of queue
// and add sender's value to the tail of the queue (both map to
// the same buffer slot because the queue is full).
//
// 发现一个等待发送者。如果缓冲区大小是0,接收值直接从发送方。
// 否则,从队列的头部接收并将发送方的值添加到队列的尾部(两者映射到相同的缓冲槽,因为队列已满)。
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
}
// buf 缓冲区有数据
if c.qcount > 0 {
// Receive directly from queue
// 直接从缓存区队列中接收数据
qp := chanbuf(c, c.recvx)
if raceenabled {
racenotify(c, c.recvx, nil)
}
// 交换数据
if ep != nil {
typedmemmove(c.elemtype, ep, qp) // ep = qp
}
typedmemclr(c.elemtype, qp) // 清除 qp
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount-- // 缓存区元素个数
unlock(&c.lock)
return true, true
}
// 下面代码是需要被阻塞的情况,当前goroutine被动执行调度到c.sendq中去等待
// block 为 false,不想阻塞而返回。
if !block {
unlock(&c.lock)
return false, false
}
// 4) send buf取没有数据或sendq中没有等待的goroutine时。
// no sender available: block on this channel.
// 没有可用的发送者:在此通道上阻塞。
gp := getg() // g
mysg := acquireSudog() // 获取一个空闲的 *sudog
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
// 在分配elem和在gp.waiting上对mysg进行排队之间没有栈拆分,因为在gp.waiting上copystack可以找到它。
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false // 标记不是在select块中来的
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg) // 加入 recvq 队列中
// Signal to anyone trying to shrink our stack that we're about
// to park on a channel. The window between when this G's status
// changes and when we set gp.activeStackChans is not safe for
// stack shrinking.
// 向任何试图缩小堆栈的人发出信号,我们即将停在一个channel上。
// 当G的状态改变和我们设置gp之间的窗口。activeStackChans 对堆栈收缩不安全。
// goroutine.parkingOnChan表示该goroutine即将停在一个chansend或chanrecv上。用于指示堆栈收缩的不安全点。它是一个布尔值,但会自动更新。
atomic.Store8(&gp.parkingOnChan, 1)
// gopark 保存当前goroutine线程; 调用 releasem(mp) 解除当前m和P的绑定
// 调用mcall(park_m) mcall切换栈到g0 park_m 解除m和g的关联 调用schedule再次开启调度循环
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
// someone woke us up
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
success := mysg.success
gp.param = nil
mysg.c = nil
releaseSudog(mysg) // 回收 sudog
return true, success
}
|