包解释

  1. 该文件包含 Go channels 的实现。
  2. c.sendq 和 c.recvq 中至少有一个是空的,除了使用 select 语句发送和接收的无缓冲 chan 上阻止了单个 goroutine 的情况外,在这种情况下,c.sendq 和c.recvq 的长度仅受 select 语句的大小限制。
  3. select 同时操作单个无缓冲 chan 的读和写这种情况下可能存在 c.sendq 和 c.recvq 都不为空(这种情况下 select不能有 default 分支)。
  4. 对于缓冲 channels,也是:
    • c.qcount > 0 表示 c.recvq 为空。缓存区有值则 c.recvq 一定为空。
    • c.qcount < c.dataqsiz 意味着 c.sendq 是空的。缓存区没有满则 c.sendq 一定为空。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
package runtime

// This file contains the implementation of Go channels.

// Invariants:
//  At least one of c.sendq and c.recvq is empty,
//  except for the case of an unbuffered channel with a single goroutine
//  blocked on it for both sending and receiving using a select statement,
//  in which case the length of c.sendq and c.recvq is limited only by the
//  size of the select statement.
//
// For buffered channels, also:
//  c.qcount > 0 implies that c.recvq is empty.
//  c.qcount < c.dataqsiz implies that c.sendq is empty.

type hchan struct

  1. hchan 结构其实就是一个【有缓冲】和【双向链表】组成的队列。
  2. 这个队列维护着通信的数据,以及挂起等待的 goroutine。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
type hchan struct {
    // 已有元素个数(也就是通道中元素个数)len(chan)
    // 缓存区元素个数,不包括sendq上面的goroutine数量(如果存在)
    qcount   uint   // total data in the queue
    // 数组容量(也就是chan容量)cap(chan)	
    // 也就是 make(chan int, size) 这里的size
    dataqsiz uint   // size of the circular queue
    // 有缓冲数组地址指针,这里是根据 dataqsiz*elemsize 计算分配的内存数组大小
    buf      unsafe.Pointer // points to an array of dataqsiz elements
    // 元素大小,比如int这里存储的就是8,string的话这里存储的就是16
    elemsize uint16
    // 通道是否被关闭 1.被关闭 0.正常
    closed   uint32
    // chan元素类型,指向类型元数据,比如chan int这里记录的就是int的元类型
    elemtype *_type // element type
    // 当前索引(记录下一次send下标),下一次写取位置。
    sendx    uint   // send index
    // 当前索引(记录下一次recv下标),下一次读入位置。
    recvx    uint   // receive index
    // 等待写的队列,是一个双向的goroutine链表
    recvq    waitq  // list of recv waiters
    // 等待读的队列,是一个双向的goroutine链表
    sendq    waitq  // list of send waiters

    // lock protects all fields in hchan, as well as several
    // fields in sudogs blocked on this channel.
    //
    // Do not change another G's status while holding this lock
    // (in particular, do not ready a G), as this can deadlock
    // with stack shrinking.
    //
    // lock保护hchan中的所有字段,以及sudogs中的一些字段在这个通道上被阻塞。
    // 在持有该锁时,不要改变另一个G的状态(特别是不要准备一个G),因为这可能会导致栈收缩死锁。
    lock mutex	// runtime.mutex 为了应对并发的读写chan,参看runtime.mutex相关文档
}
  1. hchan 缓存区内存布局图(如果存在缓存区时)

raceaddr()

  1. 该函数主要用于 make() 函数中,当申请总内存为0时,返回当前buf字段地址作为 buf 的值。形成指针指向闭环。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
func (c *hchan) raceaddr() unsafe.Pointer {
    // Treat read-like and write-like operations on the channel to
    // happen at this address. Avoid using the address of qcount
    // or dataqsiz, because the len() and cap() builtins read
    // those addresses, and we don't want them racing with
    // operations like close().
    // 
    // 将channel上的read-like和write-like操作视为在此地址发生。
    // 避免使用qcount或dataqsiz的地址,因为len()和cap()内置函数会读取这些地址,我们不希望它们与close()等操作竞争。
    return unsafe.Pointer(&c.buf)
}

sortkey()

  1. 该函数在 goselect() 函数中使用,用于返回 chan 地址升序排序 channels。
  2. select 中相关用到的函数。在后面介绍select时,会被使用。
1
2
3
func (c *hchan) sortkey() uintptr {
    return uintptr(unsafe.Pointer(c))
}

Constant

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
const (
    // 最大对齐字节数,主要用于下面的定义。
    maxAlign  = 8
    
    // hchan 占用内存大小字节,使 hchan 按照 maxAlign 大小对齐
    // 比如 hchan 是 12byte,那么 hchanSize 则是 16byte
    // 为什么要使 hchan 对齐 maxAlign?原因是 hchan 后接着是 chan 元素的内存空间块(如果是有缓冲情况下)
    // 这种情况是为了兼容32位,因为在64位下hchan就是8字节对齐的,该字段用于make函数中申请 chan 内存需要。
    hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1))
    debugChan = false	// debug
)

type waitq struct

  1. waitq 根据 sudog 形成一个双向链表。
  2. 其实就是一个队列的功能,元素【从 last 处添加】,【从 first 处取出】。
1
2
3
4
type waitq struct {
    first *sudog    // 指向链表的首个 *sudog
    last  *sudog    // 指向链表的尾部 *sudog
}

dequeue()

  1. 从 first 中取出一个 *sudog。相当于从队列头(first)取出一个 *sudog。
  2. 调用该方法时 chan lock 锁一定是被持有的。下面函数中需要 for {} 的原因是最后一个 if 条件的 CAS 操作。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
func (q *waitq) dequeue() *sudog {
    for {
        sgp := q.first  // 从first处取一个 *sudog
        if sgp == nil {
            return nil
        }
        y := sgp.next
        if y == nil {   // 最后一个 *sudog
            q.first = nil
            q.last = nil
        } else {
            y.prev = nil
            q.first = y
            // 标记已删除(参看 dequeueSudoG)
            sgp.next = nil // mark as removed (see dequeueSudoG)
        }

        // 为什么需要下面的判断条件?
        //  1. select 语句中阻塞了一组 chan 时,所有 channels 的 lock 锁都已被持有。
        //  2. 当前执行select语句的goroutine会被封装到多个*sudog中通过waitlink字段形成链表,然后把各个*sudog的isSelect字段标记为true。
        //  3. 把各个*sudog分别挂在各自的sendq或recvq链表中。等待goroutine被唤醒。
        //  4. 某一时刻*sudog被选中唤醒,则一定会执行当前函数 *sudog.isSelect 是 true,并且通过 CAS 把 g.selectDone 从0标记为1。
        //  5. 注意此时唤醒的goroutine可能在多个channel上面等待。此时可能会出现在其他chan上这个goroutine也被选中了,也在执行当前函数。
        //  6. 则这里会直接跳过。因为当前goroutine已被唤醒,后续会在唤醒的goroutine移除这里goroutine的goselect函数中。
        //  7. goroutine 唤醒后会获取所有的 channels lock,然后把selectDone设置为0,此时因为所有的channel lock已被持有所以能立即修改selectDone。
        
        // if a goroutine was put on this queue because of a
        // select, there is a small window between the goroutine
        // being woken up by a different case and it grabbing the
        // channel locks. Once it has the lock
        // it removes itself from the queue, so we won't see it after that.
        // We use a flag in the G struct to tell us when someone
        // else has won the race to signal this goroutine but the goroutine
        // hasn't removed itself from the queue yet.
        //
        // 如果一个goroutine因为select被放到这个队列上,那么在goroutine被不同的情况唤醒和它获取通道锁之间有一个小窗口。
        // 一旦它有了锁,它就会从队列中移除自己,所以在那之后我们就看不到它了。
        // 我们在G结构体中使用一个标志来告诉我们,当有其他人赢得比赛时,向这个goroutine发出信号,但该goroutine还没有从队列中删除自己。
        if sgp.isSelect && !atomic.Cas(&sgp.g.selectDone, 0, 1) {
            continue
        }

        return sgp
    }
}

enqueue()

  1. 从 last 放入 *sudog。相当于从队列尾(last)添加元素。
  2. 调用该方法时 chan lock 已被持有。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
func (q *waitq) enqueue(sgp *sudog) {
    sgp.next = nil
    x := q.last
    if x == nil {   // waitq 是空的
        sgp.prev = nil
        q.first = sgp
        q.last = sgp
        return
    }
    sgp.prev = x
    x.next = sgp
    q.last = sgp
}

dequeueSudoG()

  1. 在 selectgo() 函数中被用到,用于将指定的 *sudog 取出。
  2. 因为 select 不能立即完成时会挂在所有的 channel,当有 channel 就绪后其他 recvq、sendq 上的需要调用这个函数剔除掉。
  3. 调用该函数是相关的 channel lock 已被持有。
  4. 参数:sgp *sudog 是通过 waitlink 字段组成的 *sudog 链表。
  5. 该函数也是在 go1.19.3/src/runtime/select.go 文件中。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
func (q *waitq) dequeueSudoG(sgp *sudog) {
    // 当前 sgp 可能处在链表的任何位置,或者不在链表中。
    x := sgp.prev
    y := sgp.next
    if x != nil {
        if y != nil {
            // middle of queue
            // 
            // 在 queue 的中间
            x.next = y
            y.prev = x
            sgp.next = nil
            sgp.prev = nil
            return
        }
        // end of queue
        // 
        // 在 queue 的最后
        x.next = nil
        q.last = x
        sgp.prev = nil
        return
    }
    if y != nil {
        // start of queue
        // 
        // 在queue开头
        y.prev = nil
        q.first = y
        sgp.next = nil
        return
    }

    // x==y==nil. Either sgp is the only element in the queue,
    // or it has already been removed. Use q.first to disambiguate.
    // 
    // x==y==nil。
    // 要么 sgp 是队列中唯一的成员,要么它已经被删除。使用 q.first 来消除歧义。
    if q.first == sgp {
        q.first = nil
        q.last = nil
    }
}

type sudog struct

  1. sudog 表示等待列表中的 g,例如用于在 channel 上 sending/receiving 。
  2. sudog 是必要的,因为 g↔synchronization 对象关系是多对多的。
  3. 一个 g 可能会出现在许多等待列表中,所以一个g可能会有许多 sudog;
  4. 并且许多 g 可能在同一个 same 对象上等待,因此一个对象可能有许多 sudog。
  5. sudog 是从一个特殊的池中分配的。使用 acquireSudog 和 releaseSudog 来分配和释放它们。
  6. 文件位置:go1.19.3/src/runtime2.go。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
// sudog represents a g in a wait list, such as for sending/receiving
// on a channel.
//
// sudog is necessary because the g ↔ synchronization object relation
// is many-to-many. A g can be on many wait lists, so there may be
// many sudogs for one g; and many gs may be waiting on the same
// synchronization object, so there may be many sudogs for one object.
//
// sudogs are allocated from a special pool. Use acquireSudog and
// releaseSudog to allocate and free them.
type sudog struct {
    // The following fields are protected by the hchan.lock of the
    // channel this sudog is blocking on. shrinkstack depends on
    // this for sudogs involved in channel ops.

    g *g        // 等待的goroutine

    next *sudog // 链接下一个*sudog     对于二叉树就是left
    prev *sudog // 链接前一个*sudog     对于二叉树就是right
    // seampher中:保存来自信号量的地址;比如在sync.Mutex中则是&sync.Mutex.sema该字段的地址。
    // channels中:则是保存需要传递的值的地址。(需要交换的数据地址)
    elem unsafe.Pointer // data element (may point to stack)

    // The following fields are never accessed concurrently.
    // For channels, waitlink is only accessed by g.
    // For semaphores, all fields (including the ones above)
    // are only accessed when holding a semaRoot lock.
    //
    // 以下字段永远不会并发访问。
    // 对于 channels,waitlink 只能由 g 访问。
    // 对于 semaphores,所有字段(包括上面的字段)只有在持有 semaRoot 锁时才能访问。

    // 以下时间都是为了分析 sudog
    acquiretime int64   // 获得 sudog 的时间
    releasetime int64   // 释放时间	
    
    // ticket 用于形成最小堆,从root往下按照 s.ticket <= both s.prev.ticket AND s.next.ticket; 最小堆就是一种完全二叉树
    //  1. ticket 在 semaRoot.queue 函数中作为二叉树枝干情况下被初始化为 s.ticket = fastrand() | 1;  s.ticket >= 0
    //  2. ticket 在 semaRoot.dequeue 函数中返回 sudog 时,被重置为0
    //  3. ticket 在 sync_runtime_SemacquireMutex 函数中 如果是饥饿模式则标记为1
    // 最小堆百度百科:https://baike.baidu.com/item/%E6%9C%80%E5%B0%8F%E5%A0%86/9139372
    ticket      uint32

    // isSelect indicates g is participating in a select, so
    // g.selectDone must be CAS'd to win the wake-up race.
    //
    // isSelect 表示 g 正在参与一个 select,因此 g.selectDone 必须经过 CAS 处理才能赢得唤醒竞赛。
    // 具体参考 goselect() 函数。当前是因为 select 语句被挂起时,该字段会被设置为 true。
    isSelect bool // 在select结构中被使用

    // success indicates whether communication over channel c
    // succeeded. It is true if the goroutine was awoken because a
    // value was delivered over channel c, and false if awoken
    // because c was closed.
    //
    // success c 通道通信是否成功。
    // 如果 goroutine 因为通过通道 c 传递值而被唤醒,则为 true,如果因为通道 c 被关闭而被唤醒,则为 false。
    success bool  // 在chan中被使用,用于判断本次通信是否成功

    // semaRoot的二叉树
    parent   *sudog // semaRoot binary tree
    // g.waiting 列表或 semaRoot 的等待链表,指向链表的头。
    // 在 goselect() 函数中,挂起的 goroutine 组装的 *sudog 通过 waitlink 字段形成链表。
    // waitlink 只在 semapher 或 select 语句中被用来形成链表。
    waitlink *sudog // g.waiting list or semaRoot
    // 等待尾部 semaRoot,指向链表的尾部
    waittail *sudog // semaRoot
    // 当前sudog所属*hchan,select不能就绪要被挂起时用到。
    c        *hchan // channel
}

make()

初始化 channel。

makechan()

  1. 函数原型:make(chan Type, size int)
  2. 参数:
    • t *chantype:chan 元类型结构。
    • size int:chan 大小,默认0无缓冲 chan;>=1 都是有缓冲 chan。
type chantype struct {
    typ  _type      // chan元类型
    elem *_type     // chan 元素元类型
    dir  uintptr    // 通道方向
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
func makechan(t *chantype, size int) *hchan {
    elem := t.elem  // chan元素元类型

    // compiler checks this but be safe.
    // 
    // 编译器会检查这一点,但这是安全的。
    if elem.size >= 1<<16 { // chan元素类型内存 >= 1<<16时不适合chan
        throw("makechan: invalid channel element type")
    }
    // hchan 是否对齐 maxAlign
    if hchanSize%maxAlign != 0 || elem.align > maxAlign {
        throw("makechan: bad alignment")
    }

    // mem = elem.size * uintptr(size)
    mem, overflow := math.MulUintptr(elem.size, uintptr(size))
    if overflow || mem > maxAlloc-hchanSize || size < 0 { // 内存溢出
        panic(plainError("makechan: size out of range"))
    }

    // Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
    // buf points into the same allocation, elemtype is persistent.
    // SudoG's are referenced from their owning thread so they can't be collected.
    // TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
    //
    // 当存储在buf中的元素不包含指针时,hchan不包含GC感兴趣的指针。
    // (因此可直接在无指针内存块分配)。buf指向相同的内存分配,elemtype是持久的。
    // Sudog 是在它们自己的线程中引用的,所以它们无法被收集。
    // TODO(dvyukov,rlh):重新考虑收集器何时可以移动已分配的对象。
    var c *hchan // nil
    switch {
    // 【make(chan struct{}, n)】 OR 【make(chan int, 0)】 形式
    case mem == 0: // channel 内存为零
        // Queue or element size is zero.
        // 
        // Queue 或 element 的大小为0。
        // 注意这里申请的内存规格块是无指针的,具体原因前面注释有解释
        c = (*hchan)(mallocgc(hchanSize, nil, true)) // 申请hchan需要的内存空间
        // Race detector uses this location for synchronization.
        // 
        // 竞态检测器使用此位置进行同步。
        c.buf = c.raceaddr() // c.buf = &c.buf
    case elem.ptrdata == 0: // channel 内存不为零,元素不包含指针
        // Elements do not contain pointers.
        // Allocate hchan and buf in one call.
        // 
        // 元素不包含指针。一次调用即可分配 hchan 和 buf。
        // hchanSize 主要是为了这里的对齐。注意这里申请的内存规格块是无指针的,具体原因前面注释有解释
        c = (*hchan)(mallocgc(hchanSize+mem, nil, true)) // 申请【hchan + buf】需要内存块
        c.buf = add(unsafe.Pointer(c), hchanSize) // 可见申请的是一整块内存空间
    default: // channel 内存不为零,元素包含指针
        // Elements contain pointers.
        // 
        // 元素包含指针。
        c = new(hchan) 
        // 申请元素需要的内存块
        // 注意这里申请的是有指针内存规格块,具体原因是chan元素类型有指针可能存在多级指针引用,需要GC帮助
        c.buf = mallocgc(mem, elem, true)
    }

    c.elemsize = uint16(elem.size) // chan 元素大小
    c.elemtype = elem              // chan元素元类型
    c.dataqsiz = uint(size)        // 有缓存容量,一旦初始化就是确定的值
    // 初始化 runtime.mutex,主要是初始化锁排名
    lockInit(&c.lock, lockRankHchan)	

    if debugChan {
        print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
    }
    return c
}

makechan64()

1
2
3
4
5
6
7
func makechan64(t *chantype, size int64) *hchan {
    if int64(int(size)) != size { // 如果在32位系统下这里会报错
        panic(plainError("makechan: size out of range"))
    }

    return makechan(t, int(size))
}

c <- ep

ep 发送到 c 中。

chansend1()

  1. 编译后代码中 c <- x 的入口点。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
//
// c <- ep
//
// as 
//
// chansend1(c, &ep)
// 

// entry point for c <- x from compiled code
//
//go:nosplit
func chansend1(c *hchan, elem unsafe.Pointer) {
    chansend(c, elem, true, getcallerpc())
}

chansend()

  1. 通用单通道 send/recv,如果 block 不是 nil,那么协议将不会进入睡眠状态,如果无法完成则返回。
  2. 当涉及 sleep 的通道被关闭时,sleep 可以使用 g.param == nil 唤醒。循环并重新运行操作是最简单的;我们会看到它现在已经关闭了。
  3. 参数:
    • c *hchan:hchan 结构体的指针。指向要来用 send 数据的 channel。
    • ep unsafe.Pointer:ep 是 c <- ep 需要发送到 chan 的数据地址。
      • 是一个指针,指向要被送入通道 c 的数据,数据类型要和 c 的元素类型一致。
    • block bool:false.不能立即完成时不阻塞。 true.不能立即完成时阻塞。
      • 表示如果 send 操作不能立即完成,是否想要阻塞等待。
      • block bool 参数 false 状态用于 select{case: default:} 形式中。true 状态用于 c <- ep 情况下。
    • callerpc uintptr:是 c <- ep 的下一条代码指令地址。用于进行race相关检测。
  4. 返回值:
    • bool:true.数据 send 完成。false.表示目前不能发送,但因为不想阻塞(block为false)而返回。
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
/*
 * generic single channel send/recv
 * If block is not nil,
 * then the protocol will not
 * sleep but return if it could
 * not complete.
 *
 * sleep can wake up with g.param == nil
 * when a channel involved in the sleep has
 * been closed.  it is easiest to loop and re-run
 * the operation; we'll see that it's now closed.
 */
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    // 1) chan没有初始化,比如 var c chan int形式
    if c == nil {
        if !block {      // select{case: default:} 块
            return false // 返回false,表示未发送数据。
        }
        
        // nil <- x:如果block为true,就让当前协程永久地阻塞在这个nil通道上。
        
        // 处理相关goroutine然后再次进行新一轮调度。
        // 注意:这里的goroutine将永久丢失,因为这个goroutine没有被放入队列中等待被调度,
        // 还有就是c <- x这行代码后的所有代码都不会在被执行。
        gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
        throw("unreachable")
    }

    if debugChan { // debug
        print("chansend: chan=", c, "\n")
    }

    if raceenabled {
        racereadpc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(chansend))
    }

    // Fast path: check for failed non-blocking operation without acquiring the lock.
    //
    // After observing that the channel is not closed, we observe that the channel is
    // not ready for sending. Each of these observations is a single word-sized read
    // (first c.closed and second full()).
    // Because a closed channel cannot transition from 'ready for sending' to
    // 'not ready for sending', even if the channel is closed between the two observations,
    // they imply a moment between the two when the channel was both not yet closed
    // and not ready for sending. We behave as if we observed the channel at that moment,
    // and report that the send cannot proceed.
    //
    // It is okay if the reads are reordered here: if we observe that the channel is not
    // ready for sending and then observe that it is not closed, that implies that the
    // channel wasn't closed during the first observation. However, nothing here
    // guarantees forward progress. We rely on the side effects of lock release in
    // chanrecv() and closechan() to update this thread's view of c.closed and full().
    //
    // Fast path: 在未获得锁的情况下检查失败的非阻塞操作。
    // 在观察到通道没有关闭之后,我们观察到通道还没有准备好发送。每个观察值都是单个word-sized的读取(第一个是c.closed,第二个是full())。
    // 因为一个封闭的通道不能从'ready for sending'过渡到'not ready for sending',即使在两次观测之间通道是关闭的,
    // 它们也意味着在两次观测之间通道既没有关闭也没有准备好发送的时刻。
    // 我们的行为就像我们当时观察到通道一样,并报告发送无法继续。
    // 在这里,如果读操作被重新排序是可以的:如果我们观察到通道还没有准备好发送,然后又观察到它没有关闭,这意味着在第一次观察期间通道没有关闭。
    // 然而,这里没有任何东西能保证取得进展。我们依赖chanrecv()和closechan()中锁释放的副作用来更新这个线程的c.closed和full()视图。
    if !block && c.closed == 0 && full(c) { // select块中 && chan未关闭 && c已满
        // 如果block为false且closed为0,也就是在不想阻塞且通道未关闭的前提下,如果通道满了(无缓冲且recvq为空,或者有缓存且缓冲已用尽),
        // 则直接返回false。
        // 本步判断是在不加锁的情况下进行的,目的是让非阻塞send在无法立即完成时能真正不阻塞(加锁操作可能阻塞)。
        return false
    }

    var t0 int64
    if blockprofilerate > 0 {
        t0 = cputicks()
    }

    // 2) 尝试获取 runtime.mutex 互斥锁
    // 对hchan加锁,如果closed不为0,即通道已经关闭,则先解锁,然后panic。因为不允许用已关闭的通道进行send。
    lock(&c.lock)

    // 3) 向已关闭的chan 发送数据直接panic
    // 这里存在获取runtime.mutex期间其他协程已经把c关闭的情况,这里会直接panic
    // 因此chan的关闭是要确保所有的send操作已完成后再进行
    if c.closed != 0 {
        unlock(&c.lock)
        panic(plainError("send on closed channel"))
    }

    // 4) 从 recvq 中去找出一个正在等待的 *sudog
    // 如果 recvq 不为空,隐含了缓冲区为空,就从中取出第1个排队的协程,将数据传给这个协程,并将该协程置为ready状态
    //(放入run queue,进而得到调度),然后解锁,然后返回true。
    if sg := c.recvq.dequeue(); sg != nil { // 存在等待的 goroutine 直接交换数据即可
        // Found a waiting receiver. We pass the value we want to send
        // directly to the receiver, bypassing the channel buffer (if any).
        //
        // 找到了一个等待的接收器。我们将想要直接发送的值传递给接收器,绕过通道缓冲区(如果有的话)。
        send(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true
    }

    // 5) 缓冲区还有空间,直接把数据放入即可
    // 通过比较 qcount 和 dataqsiz 判断缓存区是否还有剩余空间,在这里无缓冲的通道被视为没有剩余空间。
    // 如果有剩余空间,将数据追加到缓冲区中,相应地移动 sendx,增加 qcount,然后解锁,返回值为 true。
    if c.qcount < c.dataqsiz {
        // Space is available in the channel buffer. Enqueue the element to send.
        // 
        // 通道缓冲区中有可用空间。对要发送的元素进行排队。
        qp := chanbuf(c, c.sendx)           // 下一个空闲插槽地址
        if raceenabled {
            racenotify(c, c.sendx, nil)
        }
        // 把c <- ep这里的ep值复制到qp这个地址中,实现把ep放入buf中
        typedmemmove(c.elemtype, qp, ep)    // 如果元素大小为0不会有任何操作
        c.sendx++                           // 下一次空闲位置
        if c.sendx == c.dataqsiz {          // 到达最大索引
            c.sendx = 0
        }
        c.qcount++                          // 已存储的数量加一
        unlock(&c.lock)
        return true
    }

    // 6) 如果上面4和5都不满足,并且是在select中,那么就直接返回false,表示当前分支不会选中 
    // 运行到这里表明通道已满,如果block为false,即不想阻塞,则解锁,返回值为false。
    if !block {
        unlock(&c.lock)
        return false
    }
    
    // 7) 下面是 c <- x 写操作需要阻塞的情况 

    // Block on the channel. Some receiver will complete our operation for us.
    // 
    // 在通道上阻塞。有人会替我们完成我们的操作。
    gp := getg() // goroutine
    // 获取一个空闲的 *sudog
    mysg := acquireSudog()	
    mysg.releasetime = 0
    if t0 != 0 {
        mysg.releasetime = -1
    }
    // No stack splits between assigning elem and enqueuing mysg
    // on gp.waiting where copystack can find it.
    // 
    // 在分配elem和在gp上排队mysg之间没有堆栈分裂。在拷贝堆能找到的地方等待。
    mysg.elem = ep
    // waitlink 只在 semapher 或 select 语句中被使用到。
    // 用来链接多个 sudog
    mysg.waitlink = nil
    mysg.g = gp
    // 标记当前 sudog 不是来自select语句
    mysg.isSelect = false
    mysg.c = c
    // 标记goroutine正在*sudog这中等待(一个有效的elem ptr); in lock order
    gp.waiting = mysg
    gp.param = nil
    // 放入 sendq queue 中等待
    c.sendq.enqueue(mysg)	
    // Signal to anyone trying to shrink our stack that we're about
    // to park on a channel. The window between when this G's status
    // changes and when we set gp.activeStackChans is not safe for
    // stack shrinking.
    // 
    // 向任何试图缩小堆栈的人发出信号,我们即将停在一个channel上。
    // 当G的状态改变和我们设置gp之间的窗口。activeStackChans 对堆栈收缩不安全。
    // goroutine.parkingOnChan表示该goroutine即将停在一个chansend或chanrecv上。
    // 用于指示堆栈收缩的不安全点。它是一个布尔值,但会自动更新。
    atomic.Store8(&gp.parkingOnChan, 1) // 表示这段时间chan正在parking中
    // gopark—>mcall->park_m->schedule()
    gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)	// 调离当前g进入调度循环
    
    // 当前g被再次调度起来时,继续这里执行
    
    // Ensure the value being sent is kept alive until the
    // receiver copies it out. The sudog has a pointer to the
    // stack object, but sudogs aren't considered as roots of the
    // stack tracer.
    // 
    // 确保正在发送的值在接收方复制出来之前都是有效的。
    // sudog有一个指向栈对象的指针,但sudogs不被认为是栈跟踪器的roots。
    // https://zhuanlan.zhihu.com/p/213744309
    KeepAlive(ep) // 保持ep是活跃的,因为ep来自用户端可能ep会被回收

    // someone woke us up.
    // 
    // 有人把我们叫醒了,可能是正常 <-c 或者 close() 函数
    if mysg != gp.waiting {	// 当前 gp 是否等待在 mysq 上
        throw("G waiting list is corrupted")
    }
    gp.waiting = nil
    // activeStackChans表示有未锁定的通道指向这个goroutine的堆栈。
    // 如果为true,堆栈复制需要获得通道锁来保护堆栈的这些区域。
    // activeStackChans 字段在 gopark 中 chanparkcommit 函数中被设置为true,因此是go被调离CPU时候设置为true,在唤醒后设置为false。
    // 具体参看栈 runtime.copystack 函数。
    gp.activeStackChans = false
    // 来自close()函数唤醒时,closed为true。
    closed := !mysg.success // 如果是有close唤醒的这里success为false并closed为1
    gp.param = nil
    if mysg.releasetime > 0 {
        blockevent(mysg.releasetime-t0, 2)
    }
    mysg.c = nil
    releaseSudog(mysg)	// 回收*sudog
    // 这里也解释了关闭channel需要谨慎操作。
    // 一个goroutine正在send,而另外一个goroutine却close了,这里就会panic。
    if closed {
        // 来自close()函数唤醒时closed字段应该为1。
        if c.closed == 0 {
            throw("chansend: spurious wakeup") // chansend 虚假唤醒
        }
        // c.closed == 1时,还存在send的g却关闭了chan报错
        // 因此 close() 函数不要在还有send未完成时调用。
        panic(plainError("send on closed channel")) // send 在关闭的 channel 上
    }
    return true
}

full()

  1. full() 报告在 c 上的发送是否会阻塞(即通道已满)。
  2. 它使用了一个可变状态的word-sized的读取,因此尽管答案立即为true,但在调用函数收到返回值时,正确的答案可能已经改变了。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
// full reports whether a send on c would block (that is, the channel is full).
// It uses a single word-sized read of mutable state, so although
// the answer is instantaneously true, the correct answer may have changed
// by the time the calling function receives the return value.
func full(c *hchan) bool {
    // c.dataqsiz is immutable (never written after the channel is created)
    // so it is safe to read at any time during channel operation.
    //
    // c.dataqsiz是不可变的(在通道创建后永不写入),因此在通道操作期间的任何时候读取都是安全的。
    if c.dataqsiz == 0 {
        // Assumes that a pointer read is relaxed-atomic.
        // 
        // 假定指针读取是relaxed-atomic的。
        return c.recvq.first == nil
    }
    // Assumes that a uint read is relaxed-atomic.
    // 
    // 假设uint read是relax-atomic。
    return c.qcount == c.dataqsiz
}

send()

  1. 交换数据,并调用goready把等待的G放入p中等待调度。
  2. send 处理空通道 c 上的发送操作。
  3. 发送端发送的值 ep 被复制到接收端sg。然后,接收者被唤醒,继续它的快乐之路。
  4. 通道c必须是空的并被锁定。用unlockf发送解锁c。sg必须已经从c中退出队列。
  5. ep必须是非空值,并且指向堆或调用者的堆栈。
  6. 参数:
    • c *hchan:hchan 结构体指针。
    • sg *sudog:是等待 ep <- c 的 g。
    • ep unsafe.Pointer:是 c <- ep 的数据地址。
    • unlockf func():闭包函数解锁 c.lock。
    • skip int:skip 跳过步骤。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// send processes a send operation on an empty channel c.
// The value ep sent by the sender is copied to the receiver sg.
// The receiver is then woken up to go on its merry way.
// Channel c must be empty and locked.  send unlocks c with unlockf.
// sg must already be dequeued from c.
// ep must be non-nil and point to the heap or the caller's stack.
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    if raceenabled {
        if c.dataqsiz == 0 {
            racesync(c, sg)
        } else {
            // Pretend we go through the buffer, even though
            // we copy directly. Note that we need to increment
            // the head/tail locations only when raceenabled.
            racenotify(c, c.recvx, nil)
            racenotify(c, c.recvx, sg)
            c.recvx++
            if c.recvx == c.dataqsiz {
                c.recvx = 0
            }
            c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
        }
    }
    // sg.elem 是等待读的地址,也就是ep <- c这里的ep地址 
    if sg.elem != nil {
        // 把ep复制到sg.elem中,这样就完成了chan的数据交换
        sendDirect(c.elemtype, sg, ep)
        sg.elem = nil
    }
    gp := sg.g  // 取出准备恢复的g
    // 把 hchan 解锁。
    unlockf()
    // 唤醒时传递的参数。主要用与 select 语句唤醒后使用。
    // select 拿着这个参数的值做比对,是哪个 case 就绪了。
    gp.param = unsafe.Pointer(sg) // g.param = *sudog
    // 把sg.success标记为true表示数据已经交换成功
    sg.success = true
    if sg.releasetime != 0 {
        sg.releasetime = cputicks()
    }
    goready(gp, skip+1)
}

sendDirect()

  1. 从 src -> sg.elem。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
    // src is on our stack, dst is a slot on another stack.
    // 
    // src在我们的堆栈上,dst是另一个堆栈上的插槽。

    // Once we read sg.elem out of sg, it will no longer
    // be updated if the destination's stack gets copied (shrunk).
    // So make sure that no preemption points can happen between read & use.
    // 
    // 一旦我们从sg中读入sg.elem,如果目标堆栈被复制(收缩),它将不再被更新。
    // 因此,请确保在读取和使用之间不会发生抢占点。
    dst := sg.elem
    // typeBitsBulkBarrier对memmove使用类型位图定位指针槽将[src, src+size)复制到[dst, dst+size)的每个指针执行写屏障。
    // 类型typ必须精确对应于[src, src+size)和[dst, dst+size)。dst、src和size必须是指针对齐的。
    typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
    // No need for cgo write barrier checks because dst is always
    // Go memory.
    // 
    // 不需要cgo写屏障检查,因为dst总是Go内存。
    memmove(dst, src, t.size)	// src -> dst
}

goready()

  1. 恢复gp也就是goroutine前进行栈切换到g0栈。
1
2
3
4
5
func goready(gp *g, traceskip int) {
    systemstack(func() {	// 切换到g0栈
        ready(gp, traceskip, true)
    })
}

ready()

  1. 恢复gp也就是这个goroutine相关的状态,然后放入P中等待被调度。
  2. 标记 gp 准备运行。
  3. 参数:
    • gp *g:当前需要恢复的goroutine
    • traceskip int:检查跳过步骤
    • next boolnexttrue时,则将gp放入到_p_.runnext
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// Mark gp ready to run.
func ready(gp *g, traceskip int, next bool) {
    if trace.enabled {
        traceGoUnpark(gp, traceskip)
    }

    // 获取当前gp这个goroutine的状态
    status := readgstatus(gp)

    // Mark runnable.
    _g_ := getg()    // 当前正在运行的g这里是g0
    // 禁用抢占,因为它可以在本地变量中持有p
    mp := acquirem() // disable preemption because it can be holding p in a local var
    if status&^_Gscan != _Gwaiting {
        dumpgstatus(gp)
        throw("bad g->status in ready")
    }

    // status is Gwaiting or Gscanwaiting, make Grunnable and put on runq
    casgstatus(gp, _Gwaiting, _Grunnable) // 切换gp的状态
    // gp放入p的本地队列,这里next是true时,会放入next字段会优先调度起来
    runqput(_g_.m.p.ptr(), gp, next) 
    // 因为有goroutine放入队列中,尝试唤醒其他工作线程起来工作
    wakep() // 有goroutine被放回队列该函数就会紧接着被调用
    releasem(mp)
}

chanbuf()

  1. chanbuf(c, i) 是指向缓冲区第 i 个槽的指针。
1
2
3
4
// chanbuf(c, i) is pointer to the i'th slot in the buffer.
func chanbuf(c *hchan, i uint) unsafe.Pointer {
    return add(c.buf, uintptr(i)*uintptr(c.elemsize))
}

ep <- c

  1. 从 c 中读取数据。
  2. 编译后代码中 <- c 的入口点。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
//
// ep <- c
//
// as  
//
// chanrecv1(c, &ep)
//

// entry points for <- c from compiled code
//
//go:nosplit
func chanrecv1(c *hchan, elem unsafe.Pointer) {
    chanrecv(c, elem, true)
}

chanrecv()

  1. chanrecv 在通道 c 接收数据,并将接收到的数据写入 ep。
  2. ep可以是nil,在这种情况下接收到的数据将被忽略。
  3. 如果block == false且没有元素可用,则返回(false, false)。否则,如果c是关闭的,则*ep设置成零值并返回(true, false)。否则,用一个元素填充*ep并返回(true, true)。
  4. 非nil的ep必须指向堆或调用者的堆栈。
  5. 参数:
    • c *hchan:hchan结构体指针,也就是 ep <- c 中 c 的结构体指针。指向要从recv数据的channel。
    • ep unsafe.Pointer:接收变量地址,也就是 ep <- c 中ep变量的地址。是一个指针,指向用来接收数据的内存,数据类型要和c的元素类型一致。
    • block bool:true.表示如果recv操作不能立即完成,是否想要阻塞等待。true.不能立即完成则阻塞,false.不能立即完成不阻塞,用于select{case: default:}形式。
  6. 返回值:这两个参数都是用于 select{case: default:}形式的。selected表示当前case被选中。received表示数据有没交换成功。
    • selected bool:true.表示操作完成(可能因为通道已经关闭)。false.表示目前不能立即完成recv,但因为不想阻塞(block为false)而返回。
    • received bool:true.表示数据确实是从通道中接收的,不是因为通道关闭而得到的零值。false.可能是因为通道关闭而得到的零值(selected为true),或者因为不想阻塞而返回(selected为false)。
  7. 返回值的组成:
    1. (true, true):操作已完成,确实是从channel中接收的(不是因为channel关闭而得到的零值)。(当前分支被选中,数据也交换成功了)
    2. (false, false):目前不能立即完成,因为不想阻塞而返回。(当前分支没被选中,走default吧)
    3. (true, false):操作已完成,因为通道关闭而返回零值。(当前分支被选中,因close而返回默认零值)
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
// chanrecv receives on channel c and writes the received data to ep.
// ep may be nil, in which case received data is ignored.
// If block == false and no elements are available, returns (false, false).
// Otherwise, if c is closed, zeros *ep and returns (true, false).
// Otherwise, fills in *ep with an element and returns (true, true).
// A non-nil ep must point to the heap or the caller's stack.
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    // raceenabled: don't need to check ep, as it is always on the stack
    // or is new memory allocated by reflect.
    // 
    // Raceenabled:不需要检查ep,因为它总是在堆栈上或由reflect分配新的内存。

    if debugChan {  // debug
        print("chanrecv: chan=", c, "\n")
    }

    // 1) chan未初始化时,比如 var c chan int
    if c == nil {
        if !block { // 来自select{case: default:}块
            return  // false, false
        }
        
        // <- nil
        
        // 这里在nil的chan中读取数据,直接切换到调度循环进行新一轮调度
        // 这个G后面的代码将不会得到执行,应该当前G既没有加入到P中等待调度,也没有在chan中,永久丢失了
        gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
        throw("unreachable")
    }

    // 2) Fast path:在未加锁下,判断block为false时,send为空时。
    
    // Fast path: check for failed non-blocking operation without acquiring the lock.
    // 
    // Fast path: 检查未获得锁的失败的非阻塞操作。
    if !block && empty(c) { // select{case: default:}块,c为空
        // After observing that the channel is not ready for receiving, we observe whether the
        // channel is closed.
        //
        // Reordering of these checks could lead to incorrect behavior when racing with a close.
        // For example, if the channel was open and not empty, was closed, and then drained,
        // reordered reads could incorrectly indicate "open and empty". To prevent reordering,
        // we use atomic loads for both checks, and rely on emptying and closing to happen in
        // separate critical sections under the same lock.  This assumption fails when closing
        // an unbuffered channel with a blocked send, but that is an error condition anyway.
        //
        // 在观察到通道还没有准备好接收之后,我们观察通道是否关闭。
        // 
        // 在与close竞争时,这些检查的重新排序可能会导致错误的行为。
        // 例如,如果channel是打开的且不是空的,被关闭,然后全部取出,重新排序的读数可能会错误地指示"open and empty"。
        // 为了防止重新排序,我们对这两种检查都使用了原子加载,并依赖于在同一锁下的不同临界区中进行清空和关闭操作。
        // 当以阻塞的发送方式关闭无缓冲的通道时,这个假设就失败了,但无论如何这都是一个错误条件。
        if atomic.Load(&c.closed) == 0 { // send为空 并 closed 未关闭,返回 (false, false)
            // Because a channel cannot be reopened, the later observation of the channel
            // being not closed implies that it was also not closed at the moment of the
            // first observation. We behave as if we observed the channel at that moment
            // and report that the receive cannot proceed.
            // 
            // 因为channel不能重新打开,所以后来观察到的channel没有关闭意味着在第一次观察的时候它也没有关闭。
            // 我们的行为就像我们当时观察到了channel,并报告说接收无法继续。
            return
        }
        // The channel is irreversibly closed. Re-check whether the channel has any pending data
        // to receive, which could have arrived between the empty and closed checks above.
        // Sequential consistency is also required here, when racing with such a send.
        // 
        // channel是不可逆关闭的。重新检查信道是否有待处理的数据要接收,这些数据可能是在上面的空检查和关闭检查之间到达的。
        // 当使用这样的发送方式比赛时,顺序的一致性也是必需的。
        if empty(c) { // send为空 并且 closed 已关闭,返回 (true, false)
            // The channel is irreversibly closed and empty.
            if raceenabled {
                raceacquire(c.raceaddr())
            }
            if ep != nil {
                typedmemclr(c.elemtype, ep)
            }
            return true, false
        }
    }

    var t0 int64
    if blockprofilerate > 0 {
        t0 = cputicks()
    }

    // 3) 加锁情况下去判断 send 和 sendq 是否有等待 send 的数据。
    
    lock(&c.lock) // 获取runtime.mutex

    // channel 已关闭
    if c.closed != 0 {	
        // send buf缓冲区没数据
        if c.qcount == 0 {	
            if raceenabled {
                raceacquire(c.raceaddr())
            }
            unlock(&c.lock)
            // 拷贝对应类型的零值
            if ep != nil {
                // Typedmemclr清除类型为typ的ptr的类型化内存。
                typedmemclr(c.elemtype, ep) // ep 赋值chan元素类型的默认值
            }
            return true, false
        }
        // The channel has been closed, but the channel's buffer have data.
        // 
        // channel 已经关闭,但通道的缓冲区有数据。
    } else { // channel 未关闭
        // Just found waiting sender with not closed.
        // 
        // 刚刚发现等待send未关闭。取出first上的第一个*sudog,这个需要处理
        // 这里隐含了 buf 缓存区为空的情况。
        if sg := c.sendq.dequeue(); sg != nil {
            // Found a waiting sender. If buffer is size 0, receive value
            // directly from sender. Otherwise, receive from head of queue
            // and add sender's value to the tail of the queue (both map to
            // the same buffer slot because the queue is full).
            // 
            // 发现一个等待发送者。如果缓冲区大小是0,接收值直接从发送方。
            // 否则,从队列的头部接收并将发送方的值添加到队列的尾部(两者映射到相同的缓冲槽,因为队列已满)。
            recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
            return true, true
        }
    }

    // buf 缓冲区有数据
    if c.qcount > 0 {
        // Receive directly from queue
        // 直接从缓存区队列中接收数据
        qp := chanbuf(c, c.recvx)
        if raceenabled {
            racenotify(c, c.recvx, nil)
        }
        // 交换数据
        if ep != nil {
            typedmemmove(c.elemtype, ep, qp) // ep = qp
        }
        typedmemclr(c.elemtype, qp)          // 清除 qp
        c.recvx++
        if c.recvx == c.dataqsiz {
            c.recvx = 0
        }
        c.qcount--                           // 缓存区元素个数
        unlock(&c.lock)
        return true, true
    }

    // 下面代码是需要被阻塞的情况,当前goroutine被动执行调度到c.sendq中去等待
    
    // block 为 false,不想阻塞而返回。
    if !block {
        unlock(&c.lock)
        return false, false
    }

    // 4) send buf取没有数据或sendq中没有等待的goroutine时。
    
    // no sender available: block on this channel.
    // 没有可用的发送者:在此通道上阻塞。
    gp := getg()            // g
    mysg := acquireSudog()  // 获取一个空闲的 *sudog
    mysg.releasetime = 0
    if t0 != 0 {
        mysg.releasetime = -1
    }
    // No stack splits between assigning elem and enqueuing mysg
    // on gp.waiting where copystack can find it.
    // 在分配elem和在gp.waiting上对mysg进行排队之间没有栈拆分,因为在gp.waiting上copystack可以找到它。
    mysg.elem = ep
    mysg.waitlink = nil
    gp.waiting = mysg
    mysg.g = gp
    mysg.isSelect = false   // 标记不是在select块中来的
    mysg.c = c
    gp.param = nil
    c.recvq.enqueue(mysg)   // 加入 recvq 队列中
    // Signal to anyone trying to shrink our stack that we're about
    // to park on a channel. The window between when this G's status
    // changes and when we set gp.activeStackChans is not safe for
    // stack shrinking.
    // 向任何试图缩小堆栈的人发出信号,我们即将停在一个channel上。
    // 当G的状态改变和我们设置gp之间的窗口。activeStackChans 对堆栈收缩不安全。
    // goroutine.parkingOnChan表示该goroutine即将停在一个chansend或chanrecv上。用于指示堆栈收缩的不安全点。它是一个布尔值,但会自动更新。
    atomic.Store8(&gp.parkingOnChan, 1)
    // gopark 保存当前goroutine线程; 调用 releasem(mp) 解除当前m和P的绑定
    // 调用mcall(park_m) mcall切换栈到g0 park_m 解除m和g的关联 调用schedule再次开启调度循环
    gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)

    // someone woke us up
    if mysg != gp.waiting {
        throw("G waiting list is corrupted")
    }
    gp.waiting = nil
    gp.activeStackChans = false
    if mysg.releasetime > 0 {
        blockevent(mysg.releasetime-t0, 2)
    }
    success := mysg.success
    gp.param = nil
    mysg.c = nil
    releaseSudog(mysg) // 回收 sudog
    return true, success
}

empty()

  1. Empty报告从c读取数据是否会阻塞(即channel是空的)。它使用单个原子读取可变状态。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
// empty reports whether a read from c would block (that is, the channel is
// empty).  It uses a single atomic read of mutable state.
func empty(c *hchan) bool {
    // c.dataqsiz is immutable.
    // 
    // c.dataqsiz 是不可变的
    if c.dataqsiz == 0 {
        // 是否有等待 send 的 goroutine
        return atomic.Loadp(unsafe.Pointer(&c.sendq.first)) == nil
    }
    // 缓存区是否有数据
    return atomic.Loaduint(&c.qcount) == 0
}

recv()

  1. recv 在一个满的缓存区的channel c上处理接收操作
  2. 有以下两部分:
    1. 发送放sg发送的值被放入channel,发送方被唤醒
    2. 接收端接收到的值(当前G)被写入ep
  3. 对于同步channel,这两个值是相同的
  4. 对于异步channel,接收方从通道缓冲区获取数据,发送方的数据放在通道缓冲区中。
  5. 通道c必须已满且已锁定。recv用unlockf解锁c。sg必须已经从c中退出队列。
  6. 非nil的ep必须指向堆或调用者的堆栈。
  7. 参数:
    1. c *hchan:当前 chan
    2. sg *sudog:从 recvq 中的first取出的第一个 *sudog
    3. ep unsafe.Pointer:v <- c 中的v地址
    4. unlockf func():解锁 runtime.mutex 的闭包函数
    5. skip int:调试相关
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
// recv processes a receive operation on a full channel c.
// There are 2 parts:
//  1. The value sent by the sender sg is put into the channel
//     and the sender is woken up to go on its merry way.
//  2. The value received by the receiver (the current G) is
//     written to ep.
//
// For synchronous channels, both values are the same.
// For asynchronous channels, the receiver gets its data from
// the channel buffer and the sender's data is put in the
// channel buffer.
// Channel c must be full and locked. recv unlocks c with unlockf.
// sg must already be dequeued from c.
// A non-nil ep must point to the heap or the caller's stack.
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    if c.dataqsiz == 0 { // 无缓冲 chan
        if raceenabled {
            racesync(c, sg)
        }
        if ep != nil {
            // copy data from sender
            // 从发送方复制数据
            recvDirect(c.elemtype, sg, ep) // ep = sg
        }
    } else {            // 有缓冲 chan
        // Queue is full. Take the item at the
        // head of the queue. Make the sender enqueue
        // its item at the tail of the queue. Since the
        // queue is full, those are both the same slot.
        // 队列已满。以队列最前面的项为例。
        // 让发送方在队列的尾部将其项目入队。
        // 因为队列已经满了,所以它们是同一个槽。
        // 以上意思是从sendq.first取出的*sudog需要恢复这个g并把数据放入缓存区,从缓存区取出下一个数据交换
        qp := chanbuf(c, c.recvx) // 数据区的下一个需要交换的数据
        if raceenabled {
            racenotify(c, c.recvx, nil)
            racenotify(c, c.recvx, sg)
        }
        // copy data from queue to receiver
        // 将数据从队列复制到接收器
        if ep != nil {
            typedmemmove(c.elemtype, ep, qp)    // ep = qp
        }
        // copy data from sender to queue
        // 从发送端复制数据到队列
        typedmemmove(c.elemtype, qp, sg.elem)   // qp = sg.elem
        c.recvx++	// 下一次recv数据索引
        if c.recvx == c.dataqsiz {	// 如果超过最大重置为0
            c.recvx = 0
        }
        // 设置缓存区是满的,因为这种情况缓存区一定是full
        // 此时需要调整 sendx 的值,因为之前就是满的 说明之前 c.sendx == c.recvx
        c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
    }
    // sg 中的 *sudog 的g需要被恢复让其从新被调度
    sg.elem = nil   // 数据已被放入缓存区,情况即可
    gp := sg.g      // 需要恢复的g
    unlockf()       // runtime.mutex 解锁
    // 当一个channel操作唤醒一个被阻塞的goroutine时,它将param设置为指向已完成阻塞操作的sudog。
    // select 拿着这个参数的值做比对,是哪个 case 就绪了。
    gp.param = unsafe.Pointer(sg) // 主要用与 select 语句唤醒后使用。
    sg.success = true // success 标记了 true,交换数据成功
    if sg.releasetime != 0 {
        sg.releasetime = cputicks()
    }
    goready(gp, skip+1)	// 放入队列中等待唤醒g
}

goready()

  1. 恢复gp也就是goroutine前进行栈切换到g0栈
1
2
3
4
5
func goready(gp *g, traceskip int) {
    systemstack(func() { // 切换到 g0 栈
        ready(gp, traceskip, true)
    })
}

ready()

  1. 标记gp准备运行。
  2. 参数:
    1. gp *g:准备恢复的 goroutine
    2. traceskip int:测试相关
    3. next bool:true.下次调度优先调度当前goroutine
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// Mark gp ready to run.
func ready(gp *g, traceskip int, next bool) {
    if trace.enabled {
        traceGoUnpark(gp, traceskip)
    }

    // goroutine 状态
    status := readgstatus(gp)

    // Mark runnable.
    _g_ := getg()    // 当前g0
    // 禁止当前m被抢占,因为即将把gp放入m关联的本地P中去
    mp := acquirem() // disable preemption because it can be holding p in a local var
    // 如果当前gp状态处理后不等于_Gwaiting等待中,那说明这个gp是有问题的
    if status&^_Gscan != _Gwaiting {
        dumpgstatus(gp)
        throw("bad g->status in ready")
    }

    // status is Gwaiting or Gscanwaiting, make Grunnable and put on runq
    // 把当前gp这个goroutine状态从_Gwaiting等待中修改为_Grunnable可以运行的状态
    casgstatus(gp, _Gwaiting, _Grunnable)
    // 然后把gp放入m的本地关联P中,next为true表示放入前面
    runqput(_g_.m.p.ptr(), gp, next)
    wakep()         // 尝试唤醒其他线程
    releasem(mp)    // 解除前面的 acquirem() 函数的抢占,并判断是否有抢占发生
}

runqput()

  1. runqput 试图将g放到本地可运行队列上。
  2. 如果 next 为 false, runqput 将 g 添加到可运行队列的尾部。
  3. 如果 next 为 true, runqput 将 g 放入_p_.runnext位置。
  4. 如果就绪队列已满,runnext 将 g 放置到全局队列中。
  5. 仅由所有者P执行。
  6. 参数:
    1. _p_ *p:当前工作线程绑定的 P
    2. gp *g:需要处理的 goroutine
    3. next bool:true.下次调度优先调度当前goroutine
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// runqput tries to put g on the local runnable queue.
// If next is false, runqput adds g to the tail of the runnable queue.
// If next is true, runqput puts g in the _p_.runnext slot.
// If the run queue is full, runnext puts g on the global queue.
// Executed only by the owner P.
func runqput(_p_ *p, gp *g, next bool) {
    // 这里是为了测试用例增加随机性的代码
    // randomizeScheduler在测试代码中这里会设置为true,正常是false
    if randomizeScheduler && next && fastrandn(2) == 0 {
        next = false
    }

    // next=true,将gp添加到_p_.runnext中。_p_.runnext 如果是非nil,是一个可运行的G
    // 如果在运行 G 的时间片中有剩余时间,那么应该运行下一个而不是 runq 中的内容
    if next {
    retryNext:
        oldnext := _p_.runnext  // 原子交换
        if !_p_.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))) {
            goto retryNext
        }
        if oldnext == 0 {
            return
        }
        // Kick the old runnext out to the regular run queue.
        // 把旧的runnext踢到常规的run队列。
        gp = oldnext.ptr()
    }

retry:
    // _p_的runq是一个256大小的循环数组,runqhead指向开始,runqtail指向尾部
    h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with consumers
    t := _p_.runqtail   // 这里之所以没有使用锁,是由于这个runqtail在其他地方不会被修改
    // 如果t-h小于总runq的大小,说明还没有存满
    if t-h < uint32(len(_p_.runq)) {
        // 这里使用t%uint32(len(_p_.runq))是由于可能出现h > t的情况,那么需要始终放在t的后面
        _p_.runq[t%uint32(len(_p_.runq))].set(gp)
        atomic.StoreRel(&_p_.runqtail, t+1) // store-release, makes the item available for consumption
        return
    }
    // 如果本地P已经存满,那么需要去全局里面存数据 
    if runqputslow(_p_, gp, h, t) {
        return
    }
    // the queue is not full, now the put above must succeed
    // 如果上面都没有存储成功,那么跳转到retry标签继续存储数据,直到成功
    goto retry
}

runqputslow()

  1. gp_p_中的一半的G尝试加入全局G中去
  2. 将g和一批来自本地可运行队列的工作放到全局队列上。
  3. 仅由所有者P执行。
  4. 参数:
    1. _p_ *p:当前工作线程绑定的P
    2. gp *g:当前需要处理的 goroutine
    3. h, t uint32_p_的runq是一个256大小的循环数组,runqhead指向开始,runqtail指向尾部
  5. 返回值:
    1. bool:true.放入成功,false.放入失败
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
// Put g and a batch of work from local runnable queue on global queue.
// Executed only by the owner P.
func runqputslow(_p_ *p, gp *g, h, t uint32) bool {
    // 首先定义batch数组,这是需要取出的G放入的数组
    // 这里的len(_p_.runq)/2 + 1是把gp放入这个+1这里算在一起的
    var batch [len(_p_.runq)/2 + 1]*g   // 临时容器

    // First, grab a batch from local queue.
    // 首先,从本地队列中获取一个批。
    n := t - h
    n = n / 2   // 取一半
    if n != uint32(len(_p_.runq)/2) {
        throw("runqputslow: queue is not full")
    }
    // 取出一半放入容器
    for i := uint32(0); i < n; i++ {
        batch[i] = _p_.runq[(h+i)%uint32(len(_p_.runq))].ptr()
    }
    // 原子交换head索引值
    if !atomic.CasRel(&_p_.runqhead, h, h+n) { // cas-release, commits consume
        return false
    }
    batch[n] = gp   // gp 放入最后一位

    // 这里如果开始起了随机性,那么会把batch顺序打乱
    if randomizeScheduler {
        for i := uint32(1); i <= n; i++ {
            j := fastrandn(i + 1)   // fastrand() % (i+1)
            batch[i], batch[j] = batch[j], batch[i]
        }
    }

    // Link the goroutines.
    // 把batch中的所有G形成一个链表链接起来
    for i := uint32(0); i < n; i++ {
        batch[i].schedlink.set(batch[i+1])
    }
    // 是一个双向链表,head和tail分别表示正序和倒叙
    var q gQueue
    q.head.set(batch[0])    // 把batch[0]指向q.head
    q.tail.set(batch[n])    // 把batch[n]指向q.tail

    // Now put the batch on global queue.
    // 现在将batch放到全局队列中。
    lock(&sched.lock)       // 获取 sched 上的 runtime.mutex
    // 把设置好的G链表链接拿到shced.runq上去
    globrunqputbatch(&q, int32(n+1))
    unlock(&sched.lock)     // 解锁
    return true
}

globrunqputbatch()

  1. 把设置好的G链表链接拿到shced.runq上去
  2. 将一批可运行的 goroutines 放到全局可运行队列中。清除 *batch。
  3. sched.lock 必须被持有。.
  4. 可能在STW期间运行,因此不允许写入障碍。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
// Put a batch of runnable goroutines on the global runnable queue.
// This clears *batch.
// sched.lock must be held.
// May run during STW, so write barriers are not allowed.
//
//go:nowritebarrierrec
func globrunqputbatch(batch *gQueue, n int32) {
    // 判断 sched.lock 锁是否持有
    assertLockHeld(&sched.lock)

    // 把设置好的G链表链接到sched.runq上去
    sched.runq.pushBackAll(*batch)
    sched.runqsize += n // 把全局链表总数量加上n
    *batch = gQueue{}   // 清空这个batch,减轻GC压力
}

v, ok := <-c

  1. 编译后是通过调用 chanrecv2 函数。
  2. ok:true.确实从channel中接收的值(不是因为channel关闭而得到的零值)。false.因为channel关闭而返回的零值。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
// 通过编译后:
//
// if v, ok := <-c; ok {
//      ... foo
// }
//
// as 
//
// if _, ok = chanrecv2(c, &v); ok {
//      ... foo
// }

//go:nosplit
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
    // 1. (_, true):数据交换成功,得到对应交换的数据。
    // 2. (_, false):因close导致获取到了零值数据。
    _, received = chanrecv(c, elem, true)
    return
}

gopark()

  1. 将当前例程置于等待状态并调用系统堆栈上的 unlock。
  2. 如果 unlock 返回 false,则继续执行该 goroutine。
  3. unlockf 不能访问这个 G 的堆栈,因为它可能在调用 gopark 和调用 unlockf 之间移动。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// Puts the current goroutine into a waiting state and calls unlockf on the
// system stack.
//
// If unlockf returns false, the goroutine is resumed.
//
// unlockf must not access this G's stack, as it may be moved between
// the call to gopark and the call to unlockf.
//
// Note that because unlockf is called after putting the G into a waiting
// state, the G may have already been readied by the time unlockf is called
// unless there is external synchronization preventing the G from being
// readied. If unlockf returns false, it must guarantee that the G cannot be
// externally readied.
//
// Reason explains why the goroutine has been parked. It is displayed in stack
// traces and heap dumps. Reasons should be unique and descriptive. Do not
// re-use reasons, add new ones.
func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) {
    // const waitReasonSleep = 19
    // 等待原因不是因为sleep时,按需调用checkTimeouts(),检查timer。
    if reason != waitReasonSleep {
        // timeouts 可能会在两个goroutine使调度程序繁忙时过期。
        // 检查 p.timers 在调度循环时或 goroutine 被调离CPU 或 sysmon 监控线程中都会轮询查看。
        checkTimeouts() // timeouts may expire while two goroutines keep the scheduler busy
    }
    mp := acquirem()
    gp := mp.curg // 当前gp
    status := readgstatus(gp) // 获取状态
    if status != _Grunning && status != _Gscanrunning {
        throw("gopark: bad g status")
    }
    mp.waitlock = lock      // 等待的锁,unlockf的第二个参数
    mp.waitunlockf = unlockf// 调离前需要执行的闭包
    gp.waitreason = reason
    mp.waittraceev = traceEv
    mp.waittraceskip = traceskip
    releasem(mp)
    // can't do anything that might move the G between Ms here.
    mcall(park_m) // mcall保存现场并切换g0调用park_m函数。
}

park_m()

  1. 该函数在g0上。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// park continuation on g0.
func park_m(gp *g) {
    mp := getg().m

    if trace.enabled {
        traceGoPark(mp.waittraceev, mp.waittraceskip)
    }

    // N.B. Not using casGToWaiting here because the waitreason is
    // set by park_m's caller.
    casgstatus(gp, _Grunning, _Gwaiting) // 切换gp的状态
    dropg() // 解除m与gp的绑定

    if fn := mp.waitunlockf; fn != nil {
        ok := fn(gp, mp.waitlock) // 调用waitunlockf
        mp.waitunlockf = nil
        mp.waitlock = nil
        // 返回false时,再次运行gp
        if !ok {
            if trace.enabled {
                traceGoUnpark(gp, 2)
            }
            casgstatus(gp, _Gwaiting, _Grunnable)
            execute(gp, true) // Schedule it back, never returns.
        }
    }
    schedule() // 调度循环
}

close()

  1. 关闭 hchan。
  2. 几种关闭 channel 的情况:
    1. close 关闭时,sendq 上有等待的 goroutine,会 panic。【“panic: send on closed channel”】。
    2. close 关闭时,再有 send 操作,会 panic。【“panic: send on closed channel”】。
    3. close 关闭时,buf 中有数据,不会 panic,recv 可以读取它们。
  3. 向 nil 的 channel,send 或 recv 时当前 goroutine 会panic。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
func closechan(c *hchan) {
    // 1) nil 的 chan 是不允许被 close 的
    if c == nil {	
        panic(plainError("close of nil channel"))
    }

    lock(&c.lock) // 尝试获取 runtime.mutex
    // 2) 已经关闭的 chan 不能再次关闭
    if c.closed != 0 {	
        unlock(&c.lock)
        panic(plainError("close of closed channel"))
    }

    if raceenabled {
        callerpc := getcallerpc()
        racewritepc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(closechan))
        racerelease(c.raceaddr())
    }

    // 3) 获取到互斥锁后首先标记closed字段
    c.closed = 1 // 标记chan状态为关闭 0.未关闭 1.已关闭

    // gList是一个goroutine的链表
    // 当前要关闭的c还未处理的goroutine
    var glist gList

    // 4) 处理 recvq 上的 goroutine

    // release all readers
    for {
        // 如果同一个goroutine在多个channel上时(这种情况发生在goselect时)
        // dequeue() 函数有相关的排重,CAS操作。
        sg := c.recvq.dequeue()
        if sg == nil {
            break
        }
        if sg.elem != nil {
            typedmemclr(c.elemtype, sg.elem)
            sg.elem = nil
        }
        if sg.releasetime != 0 {
            sg.releasetime = cputicks()
        }
        gp := sg.g
        // 主要用与 select 语句唤醒后使用。
        gp.param = unsafe.Pointer(sg) // g.param = *sudog
        sg.success = false
        if raceenabled {
            raceacquireg(gp, c.raceaddr())
        }
        glist.push(gp)
    }

    // 5) 处理 sendq 上的 goroutine,这些goroutine得到运行后会 panic。
    // 因为不能向close的channel有send操作,其中一种情况体现在这里。

    // release all writers (they will panic)
    for {
        sg := c.sendq.dequeue()
        if sg == nil {
            break
        }
        sg.elem = nil
        if sg.releasetime != 0 {
            sg.releasetime = cputicks()
        }
        gp := sg.g
        // 主要用与 select 语句唤醒后使用。
        gp.param = unsafe.Pointer(sg) // g.param = *sudog
        sg.success = false
        if raceenabled {
            raceacquireg(gp, c.raceaddr())
        }
        glist.push(gp)
    }
    unlock(&c.lock)

    // 6) 将这些 goroutine 放回等待队列中

    // Ready all Gs now that we've dropped the channel lock.
    for !glist.empty() {
        gp := glist.pop()
        gp.schedlink = 0
        // 将gp放入本地P的队列中
        goready(gp, 3)
    }
}

type guintptr uintptr

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// A guintptr holds a goroutine pointer, but typed as a uintptr
// to bypass write barriers. It is used in the Gobuf goroutine state
// and in scheduling lists that are manipulated without a P.
//
// The Gobuf.g goroutine pointer is almost always updated by assembly code.
// In one of the few places it is updated by Go code - func save - it must be
// treated as a uintptr to avoid a write barrier being emitted at a bad time.
// Instead of figuring out how to emit the write barriers missing in the
// assembly manipulation, we change the type of the field to uintptr,
// so that it does not require write barriers at all.
//
// Goroutine structs are published in the allg list and never freed.
// That will keep the goroutine structs from being collected.
// There is never a time that Gobuf.g's contain the only references
// to a goroutine: the publishing of the goroutine in allg comes first.
// Goroutine pointers are also kept in non-GC-visible places like TLS,
// so I can't see them ever moving. If we did want to start moving data
// in the GC, we'd need to allocate the goroutine structs from an
// alternate arena. Using guintptr doesn't make that problem any worse.
// Note that pollDesc.rg, pollDesc.wg also store g in uintptr form,
// so they would need to be updated too if g's start moving.
type guintptr uintptr

//go:nosplit
func (gp guintptr) ptr() *g { return (*g)(unsafe.Pointer(gp)) }

//go:nosplit
func (gp *guintptr) set(g *g) { *gp = guintptr(unsafe.Pointer(g)) }

//go:nosplit
func (gp *guintptr) cas(old, new guintptr) bool {
    return atomic.Casuintptr((*uintptr)(unsafe.Pointer(gp)), uintptr(old), uintptr(new))
}

type gList struct

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// A gList is a list of Gs linked through g.schedlink. A G can only be
// on one gQueue or gList at a time.
type gList struct {
    head guintptr
}

// empty reports whether l is empty.
func (l *gList) empty() bool {
    return l.head == 0
}

// push adds gp to the head of l.
func (l *gList) push(gp *g) {
    gp.schedlink = l.head
    l.head.set(gp)
}

// pushAll prepends all Gs in q to l.
func (l *gList) pushAll(q gQueue) {
    if !q.empty() {
        q.tail.ptr().schedlink = l.head
        l.head = q.head
    }
}

// pop removes and returns the head of l. If l is empty, it returns nil.
func (l *gList) pop() *g {
    gp := l.head.ptr()
    if gp != nil {
        l.head = gp.schedlink
    }
    return gp
}

len()

  1. 获取 chan 的元素个数。(获取的是缓存区的个数,没有挂在链表中的数量)
1
2
3
4
5
6
7
//go:linkname reflect_chanlen reflect.chanlen
func reflect_chanlen(c *hchan) int {
    if c == nil {
        return 0
    }
    return int(c.qcount)
}
1
2
3
4
5
6
7
//go:linkname reflectlite_chanlen internal/reflectlite.chanlen
func reflectlite_chanlen(c *hchan) int {
    if c == nil {
        return 0
    }
    return int(c.qcount)
}

cap()

  1. 返回值和len()函数一致。
1
2
3
4
5
6
7
//go:linkname reflect_chancap reflect.chancap
func reflect_chancap(c *hchan) int {
    if c == nil {
        return 0
    }
    return int(c.dataqsiz)
}

select default

  1. 以下是存在 default 默认分支情况。意思类似于 tryLock 函数,尝试一次。
  2. 以下只是特例,编译器不采用 goselect() 函数时。都是 select {case: default:} 这种形式。

select 中 c <- v

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
// compiler implements
//
//	select {
//	case c <- v:
//		... foo
//	default:
//		... bar
//	}
//
// as
//
//	if selectnbsend(c, v) {
//		... foo
//	} else {
//		... bar
//	}
func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
    // true. 数据send完成。
    // false. 表示目前不能发送,因为不想阻塞(block为false)而返回。
    // false只在这里被传入使用。
    return chansend(c, elem, false, getcallerpc())
}
  1. 验证上面代码。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
func chanTs() {
    ch := make(chan int)
    a, b := 1, 2

    select {
    case ch <- a:
        a = b
    default:
        b = a
    }
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
TEXT main.chanTs(SB) G:/workspace/hello/main.go
func chanTs() {
  0x490ae0              493b6610                CMPQ 0x10(R14), SP
  0x490ae4              0f8685000000            JBE 0x490b6f
  0x490aea              4883ec40                SUBQ $0x40, SP
  0x490aee              48896c2438              MOVQ BP, 0x38(SP)
  0x490af3              488d6c2438              LEAQ 0x38(SP), BP
        ch := make(chan int)
  0x490af8              488d05419b0000          LEAQ runtime.rodata+30272(SB), AX
  0x490aff              31db                    XORL BX, BX
  0x490b01              e8da3cf7ff              CALL runtime.makechan(SB)
  0x490b06              4889442428              MOVQ AX, 0x28(SP)
        a, b := 1, 2
  0x490b0b              48c744241801000000      MOVQ $0x1, 0x18(SP)
  0x490b14              48c744241002000000      MOVQ $0x2, 0x10(SP)
        case ch <- a:
  0x490b1d              488b4c2428              MOVQ 0x28(SP), CX
  0x490b22              48894c2430              MOVQ CX, 0x30(SP)
  0x490b27              488b4c2418              MOVQ 0x18(SP), CX
  0x490b2c              48894c2420              MOVQ CX, 0x20(SP)
  0x490b31              488b442430              MOVQ 0x30(SP), AX   # 参数 c
  0x490b36              488d5c2420              LEAQ 0x20(SP), BX   # 参数 elem
  0x490b3b              0f1f440000              NOPL 0(AX)(AX*1)
  0x490b40              e89b55f7ff              CALL runtime.selectnbsend(SB) # 调用selectnbsend
  0x490b45              84c0                    TESTL AL, AL
  0x490b47              7502                    JNE 0x490b4b
  0x490b49              eb0c                    JMP 0x490b57
                a = b
  0x490b4b              488b442410              MOVQ 0x10(SP), AX
  0x490b50              4889442418              MOVQ AX, 0x18(SP)
  0x490b55              eb0c                    JMP 0x490b63
                b = a
  0x490b57              488b442418              MOVQ 0x18(SP), AX
  0x490b5c              4889442410              MOVQ AX, 0x10(SP)
  0x490b61              eb00                    JMP 0x490b63
        case ch <- a:
  0x490b63              eb00                    JMP 0x490b65
}
  0x490b65              488b6c2438              MOVQ 0x38(SP), BP
  0x490b6a              4883c440                ADDQ $0x40, SP
  0x490b6e              c3                      RET
func chanTs() {
  0x490b6f              e8ecb4fcff              CALL runtime.morestack_noctxt.abi0(SB)
  0x490b74              e967ffffff              JMP main.chanTs(SB)

  0x490b79              cc                      INT $0x3
  0x490b7a              cc                      INT $0x3
  0x490b7b              cc                      INT $0x3
  0x490b7c              cc                      INT $0x3
  0x490b7d              cc                      INT $0x3
  0x490b7e              cc                      INT $0x3
  0x490b7f              cc                      INT $0x3

select 中 v <- c

  1. go1.18版本前。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
// compiler implements
//
//  select {
//  case v = <-c:
//      ... foo
//  default:
//      ... bar
//  }
//
// as
//
//  if selectnbrecv(&v, c) {
//      ... foo
//  } else {
//      ... bar
//  }
//
func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected bool) {
    // selected:表示当前分支是否选中
    selected, _ = chanrecv(c, elem, false)
    return
}

select 中 v, ok = <- c

  1. go1.18版本前。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// compiler implements
//
//  select {
//  case v, ok = <-c:
//      ... foo
//  default:
//      ... bar
//  }
//
// as
//
//  if c != nil && selectnbrecv2(&v, &ok, c) {
//      ... foo
//  } else {
//      ... bar
//  }
//
func selectnbrecv2(elem unsafe.Pointer, received *bool, c *hchan) (selected bool) {
    // TODO(khr): just return 2 values from this function, now that it is in Go.
    // selected:表示当前分支是否选中
    // received:表示当前是否因为 close 而关闭的零值
    selected, *received = chanrecv(c, elem, false)
    return
}

其他版本

  1. 在go1.19.3中,<-c 有所改变但是原理都一样。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// compiler implements
//
//  select {
//  case v, ok = <-c:
//      ... foo
//  default:
//      ... bar
//  }
//
// as
//
//  if selected, ok = selectnbrecv(&v, c); selected {
//      ... foo
//  } else {
//      ... bar
//  }
func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected, received bool) {
    // selected:表示当前分支是否选中
    // received:表示当前是否因为 close 而关闭的零值
    return chanrecv(c, elem, false)
}

//
//  select {
//  case v = <-c:
//      ... foo
//  default:
//      ... bar
//  }
//
// as 
//
//  if selected, _ = selectnbrecv(&v, c); selected {
//      ... foo
//  } else {
//      ... bar
//  }
  1. 验证上面代码。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
func chanTs() {
    ch := make(chan int)
    a, b := 1, 2
    var ok bool

    select {
    case a, ok = <-ch:
        b = a
        if ok {
            b = 100
        }
    default:
        a = b
    }
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
TEXT main.chanTs(SB) G:/workspace/hello/main.go
func chanTs() {
  0x490ae0              493b6610                CMPQ 0x10(R14), SP
  0x490ae4              0f86aa000000            JBE 0x490b94
  0x490aea              4883ec48                SUBQ $0x48, SP
  0x490aee              48896c2440              MOVQ BP, 0x40(SP)
  0x490af3              488d6c2440              LEAQ 0x40(SP), BP
        ch := make(chan int)
  0x490af8              488d05419b0000          LEAQ runtime.rodata+30272(SB), AX
  0x490aff              31db                    XORL BX, BX
  0x490b01              e8da3cf7ff              CALL runtime.makechan(SB)
  0x490b06              4889442430              MOVQ AX, 0x30(SP)
        a, b := 1, 2
  0x490b0b              48c744242001000000      MOVQ $0x1, 0x20(SP)
  0x490b14              48c744241802000000      MOVQ $0x2, 0x18(SP)
        var ok bool
  0x490b1d              c644241500              MOVB $0x0, 0x15(SP)
        case a, ok = <-ch:
  0x490b22              488b5c2430              MOVQ 0x30(SP), BX
  0x490b27              48895c2438              MOVQ BX, 0x38(SP)
  0x490b2c              488d442428              LEAQ 0x28(SP), AX
  0x490b31              e8aa55f7ff              CALL runtime.selectnbrecv(SB) # selectnbrecv
  0x490b36              88442416                MOVB AL, 0x16(SP)
  0x490b3a              885c2417                MOVB BL, 0x17(SP)
  0x490b3e              807c241600              CMPB $0x0, 0x16(SP)
                if ok {
  0x490b64              807c241500              CMPB $0x0, 0x15(SP)
  0x490b69              7502                    JNE 0x490b6d
  0x490b6b              eb0b                    JMP 0x490b78
                        b = 100
  0x490b6d              48c744241864000000      MOVQ $0x64, 0x18(SP)
  0x490b76              eb02                    JMP 0x490b7a
                if ok {
  0x490b78              eb00                    JMP 0x490b7a
        case a, ok = <-ch:
  0x490b7a              eb0c                    JMP 0x490b88
                a = b
  0x490b7c              488b442418              MOVQ 0x18(SP), AX
  0x490b81              4889442420              MOVQ AX, 0x20(SP)
  0x490b86              eb00                    JMP 0x490b88
        case a, ok = <-ch:
  0x490b88              eb00                    JMP 0x490b8a
}
  0x490b8a              488b6c2440              MOVQ 0x40(SP), BP
  0x490b8f              4883c448                ADDQ $0x48, SP
  0x490b93              c3                      RET
func chanTs() {
  0x490b94              e8c7b4fcff              CALL runtime.morestack_noctxt.abi0(SB)
  0x490b99              e942ffffff              JMP main.chanTs(SB)

  0x490b9e              cc                      INT $0x3
  0x490b9f              cc                      INT $0x3

for range

  1. 参考 流程控制(range迭代)

总结

  1. 向(没在select块中) nil 的 channel 中 send、recv 会 panic。在select块中 nil 的 channel 会被丢弃。
  2. send 总是先判断的 close(panic),再判断的数据能否交换(select中的也一样)。recv 也是先判断 close 但是不会panic,再判断 buf 有没数据,有则取出,不会去 sendq 中查看挂起的 sudog。
  3. nil 的 channel 不允许 close 操作,会 panic。close 已经 close 的 channel 会 panic。

参考

  1. https://mp.weixin.qq.com/s/6ZEGtXRGKm2qP5b-rGLyVg
  2. https://mp.weixin.qq.com/s?__biz=Mzg5NjIwNzIxNQ==&mid=2247484471&idx=2&sn=49af599b9c3796857459a14d040586fd&scene=19#wechat_redirect