• 本篇介绍Golang相关调度代码,本篇也是理解GMP模型的重点篇节。

runtime·mstart(SB)

工作线程M的自旋状态(spinning)解释:工作线程在从其它工作线程的本地运行队列中盗取goroutine时的状态称为自旋状态

  1. 该函数是所有新创建的工作线程需要执行的函数,也是调度循环的入口函数。
  2. 所有【新创建的工作线程】开始运行的入口都是从这个函数开始运行的。
  3. 文件位置:go1.19.3/src/runtime/asm_amd64.s
389
390
391
392
393
TEXT runtime·mstart(SB),NOSPLIT|TOPFRAME,$0
    # 调用runtime.mstart0函数,该函数永远不会返回
    CALL    runtime·mstart0(SB) 
    # 未达到。不会到这里来。
    RET # not reached

mstart0()

  1. mstart0是新MsGo入口点。该函数是不允许栈增长检查的,因为我们甚至可能还没有设置堆栈边界。
  2. 能在STW期间运行(因为它还没有P),所以不允许写屏障。
  3. 初始化g0栈大小,以及调用mstart1()函数开启调度循环。
  4. 因为可能存在其他刚创建的工作线程并没有初始化g0栈大小,所以这里需要设置一下。
  5. 文件位置:go1.19.3/src/runtime/proc.go
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
// mstart0 is the Go entry-point for new Ms.
// This must not split the stack because we may not even have stack
// bounds set up yet.
//
// May run during STW (because it doesn't have a P yet), so write
// barriers are not allowed.
//
//go:nosplit
//go:nowritebarrierrec
func mstart0() {
    // 该函数是工作线程M起来执行的入口函数,这里一定是g0栈。
    _g_ := getg()   // _g_ = g0

    // 当前g是否已分配栈:
    //  1. 程序刚初始化时前面是分配了大约64KB大小栈。
    //  2. 如果是通过wakeup()函数创建的工作线程,这里可能是没有分配栈大小的。
    osStack := _g_.stack.lo == 0    // 判断当前g0是否分配栈
    if osStack { // 栈未分配大小时,这也是新创建的工作线程需要处理的g栈情况
        // Initialize stack bounds from system stack.
        // Cgo may have left stack size in stack.hi.
        // minit may update the stack bounds.
        // 
        // 从系统栈初始化栈边界。Cgo 可能在 stack.hi 中留下了栈大小。minit 可能会更新栈边界。
        //
        // Note: these bounds may not be very accurate.
        // We set hi to &size, but there are things above
        // it. The 1024 is supposed to compensate this,
        // but is somewhat arbitrary.
        // 
        // 注意:这些界限可能不是很准确。
        // 我们将 hi 设置为 &size,但是上面还有一些东西。 
        // 1024 应该可以弥补这一点,但有些武断。
        
        // 以上的意思是直接在当前工作线程系统栈上给当前这个g0分配栈大小。
        // 可能上面有栈数据,偏移1024字节应该能弥补这些数据。
        size := _g_.stack.hi // size 多半是0; size 一定是分配在当前栈上的,因此&size就是栈地址。
        if size == 0 {	
            // sys.StackGuardMultiplier = 1; 可见其他工作线程的g0栈大约为8KB。
            size = 8192 * sys.StackGuardMultiplier  // 设置size为指定值
        }
        
        // noescape函数取size地址并与0异或,实际作用是隐藏(防止)size变量逃逸分析指针
        // 防止编译器把size变量堆分配,这里需要的是栈分配
        // 因此当前_g_的栈备份分配到当前栈的size变量位置
        // noescape函数:
        // func noescape(p unsafe.Pointer) unsafe.Pointer {
        //      x := uintptr(p)
        //      return unsafe.Pointer(x ^ 0)    // 防止变量x逃逸
        // }
        // 以&size为起点设置g0栈。g0.stack -> [&size, &size - 8192 + 1024]
        _g_.stack.hi = uintptr(noescape(unsafe.Pointer(&size))) // hi存储size的地址,也就是rbp
        _g_.stack.lo = _g_.stack.hi - size + 1024   // lo存储栈顶位置,也就是rsp
    }
    
    // Initialize stack guard so that we can start calling regular
    // Go code.
    // 
    // 初始化栈保护,以便我们可以开始调用常规 Go 代码。
    // stackguard0 = _g_.stack.lo + 928; 栈溢出检查的阈值点。
    _g_.stackguard0 = _g_.stack.lo + _StackGuard
    
    // This is the g0, so we can also call go:systemstack
    // functions, which check stackguard1.
    // 
    // 这是 g0,所以我们也可以调用 go:systemstack 函数来检查 stackguard1。
    _g_.stackguard1 = _g_.stackguard0
    mstart1() // 调用mstart1开启调度循环,该函数永远不会返回

    // Exit this thread.	
    // 
    // 退出这个线程,程序不会到这里。
    if mStackIsSystemAllocated() {
        // Windows, Solaris, illumos, Darwin, AIX and Plan 9 always system-allocate
        // the stack, but put it in _g_.stack before mstart,
        // so the logic above hasn't set osStack yet.
        // 
        // Windows、Solaris、illumos、Darwin、AIX 和Plan 9 总是系统分配栈,但是在mstart 之前放在_g_.stack 中,
        // 所以上面的逻辑还没有设置osStack
        osStack = true
    }
    mexit(osStack) // 结束当前线程。
}
  1. 总结:工作线程开始执行使,先判断g0是否分配了栈大小,没有则分配大约8KB大小栈空间,然后设置stackguard0stackguard1

mstart1()

  1. 设置g0被调度时的调度信息,比如从哪里进入,栈从哪里开始等,以及给当前工作线程M绑定个P并开启调度循环。
  2. 设置g0的调度信息是在于,在调度循环过程中会切换到g0栈执行runtime的相关函数,以免栈无限扩大。
  3. 文件位置:go1.19.3/src/runtime/proc.go
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
// The go:noinline is to guarantee the getcallerpc/getcallersp below are safe,
// so that we can set up g0.sched to return to the call of mstart1 above.
//go:noinline
func mstart1() {
    _g_ := getg()   // _g_ = g0

    // 当前_g_ 一定是g0,因为该函数只有程序初始化或线程刚启动时才会调用。
    if _g_ != _g_.m.g0 {
        throw("bad runtime·mstart")
    }

    // Set up m.g0.sched as a label returning to just
    // after the mstart1 call in mstart0 above, for use by goexit0 and mcall.
    // We're never coming back to mstart1 after we call schedule,
    // so other calls can reuse the current frame.
    // And goexit0 does a gogo that needs to return from mstart1
    // and let mstart0 exit the thread.
    // 
    // 将 m.g0.sched 设置为上面 mstart0 中 mstart1 调用后返回的标签,供 goexit0 和 mcall 使用
    // 在调用 schedule 之后,我们永远不会回到 mstart1,因此其他调用可以重用当前帧
    // 而goexit0做了一个gogo,需要从mstart1返回,让mstart0退出线程
    
    // g0.sched.g = g0
    _g_.sched.g = guintptr(unsafe.Pointer(_g_)) // 设置g0的调度信息是当前g
    // g0.sched.pc = getcallerpc()
    // getcallerpc():调用者函数的下一条指令,也就是mstart0()函数调用mstart1()函数后的if判断指令代码处。
    _g_.sched.pc = getcallerpc()
    // 这里是理解调度循环的关键,调度循环每次切换到g0栈都从这里(指定的固定位置)设置的位置开始使用栈。
    // g0.sched.pc = getcallersp()
    // getcallersp():调用者当前的SP寄存器值,也就是mstart0()函数调用mstart1()函数时SP寄存器的值。
    // 设置rsp寄存器值为master0调用master1时的栈顶处,设置在这里便于每次切换的g0栈都是从固定位置开始
    _g_.sched.sp = getcallersp() 

    // go1.19.3/src/runtime/asm_amd64.s
    // TEXT runtime·asminit(SB),NOSPLIT,$0-0
    //   // No per-thread init.
    //   RET
    // 没有什么可做的。
    asminit()   // 该函数的汇编代码什么都没做,初始化M工作线程
    minit()     // 调用以初始化一个新的 m(包括引导程序 m),在新线程上调用,无法分配内存

    // Install signal handlers; after minit so that minit can
    // prepare the thread to be able to handle the signals.
    // 
    // 安装信号处理程序; 在 minit 之后,以便 minit 可以准备线程以处理信号
    if _g_.m == &m0 {   // 判断g0绑定的m是否是m0,m0是main.goroutine也就是主线程时执行下面方法
        mstartm0()
    }

    // 如果_g_.m.mstartfn存在则执行该函数:
    //  1. 一般的情况下该函数是,mspinning()函数。该函数只有一条指令,标记m的mspining字段为true。
    //  2. 如果是sysmon线程下,这里是直接调用sysmon()函数,该函数是一个无限循环,这里不会返回。
    if fn := _g_.m.mstartfn; fn != nil {
        fn()
    }

    // 程序刚初始化时,一定是m0,因为在前面m0已经绑定了P,所以是m0需要跳过。
    // 通过wakeup()函数创建的新的工作线程m时,这里需要绑定个P。
    if _g_.m != &m0 { // 不是m0则需要给工作线程M绑定一个P
        // _g_.m.nextp.ptr()获取下一个P,nextp在wakeup()函数相关被赋值。
        acquirep(_g_.m.nextp.ptr()) // 调用acquirep(_g_.m.nextp.ptr())绑定p
        _g_.m.nextp = 0
    }
    // 开启循环调度,该函数永远不返回
    schedule()
}

minit()
  1. 初始化一个新的m(包括引导程序m)。在新线程上调用,无法分配内存。
  2. 文件位置:go1.19.3/src/runtime/os_linux.go
390
391
392
393
394
395
396
397
398
399
400
// Called to initialize a new m (including the bootstrap m).
// Called on the new thread, cannot allocate memory.
func minit() {
    // 初始化 Signals
    minitSignals()

    // Cgo-created threads and the bootstrap m are missing a
    // procid. We need this for asynchronous preemption and it's
    // useful in debuggers.
    getg().m.procid = uint64(gettid())
}
  1. 文件位置:go1.19.3/src/runtime/signal_unix.go
1192
1193
1194
1195
1196
1197
// minitSignals is called when initializing a new m to set the
// thread's alternate signal stack and signal mask.
func minitSignals() {
    minitSignalStack()
    minitSignalMask()
}

acquirep()
  1. 参数_p_ *p:空闲的pwakeup()函数时存入m.nextp处,供工作线程启动后绑定这个P
  2. 文件位置:go1.19.3/src/runtime/proc.go
4938
4939
4940
4941
4942
4943
4944
4945
4946
4947
4948
4949
4950
4951
4952
4953
4954
4955
4956
4957
4958
4959
// Associate p and the current m.
//
// This function is allowed to have write barriers even if the caller
// isn't because it immediately acquires _p_.
//
//go:yeswritebarrierrec
func acquirep(_p_ *p) {
    // Do the part that isn't allowed to have write barriers.
    wirep(_p_)	// 绑定P与当前工作线程M

    // Have p; write barriers now allowed.

    // Perform deferred mcache flush before this P can allocate
    // from a potentially stale mcache.
    // 
    // 在此 P 可以从可能过时的 mcache 分配之前执行延迟 mcache 刷新
    _p_.mcache.prepareForSweep()

    if trace.enabled {
        traceProcStart()
    }
}
  1. 绑定传入的P在当前工作线程。wirepacquirep的第一步,实际上是将当前的M关联到_p_
  2. 这是被打破的,所以我们可以禁止这部分的写屏障,因为我们还没有P
  3. 文件位置:go1.19.3/src/runtime/proc.go
4959
4960
4961
4962
4963
4964
4965
4966
4967
4968
4969
4970
4971
4972
4973
4974
4975
4976
4977
4978
4979
4980
4981
4982
4983
4984
4985
// wirep is the first step of acquirep, which actually associates the
// current M to _p_. This is broken out so we can disallow write
// barriers for this part, since we don't yet have a P.
//
//go:nowritebarrierrec
//go:nosplit
func wirep(_p_ *p) {
    _g_ := getg()     // 获取当前的g

    if _g_.m.p != 0 { // 当前工作线程如果绑定了P则有问题
        throw("wirep: already in go")
    }
    // 当前P绑定了工作线程存在问题,该P不是空闲的 或 当前P不处于空闲状态
    if _p_.m != 0 || _p_.status != _Pidle {
        id := int64(0)
        if _p_.m != 0 {
            id = _p_.m.ptr().id
        }
        print("wirep: p->m=", _p_.m, "(", id, ") p->status=", _p_.status, "\n")
        throw("wirep: invalid p state")
    }
    // 当前工作线程M绑定P
    _g_.m.p.set(_p_)  // m.p = _p_		
    // 当前P绑定工作线程M
    _p_.m.set(_g_.m)  // _p_.m = m  
    _p_.status = _Prunning    // 把当前P状态修改为_Prunning
}

schedule()

  1. 循环调度开始。每轮循环都从这里开始。该函数算是调度器的核心函数,运行起来的线程会一直执行它。
  2. 一轮调度器:找到一个可运行的goroutine并执行它。永不返回。
  3. 文件位置:go1.19.3/src/runtime/proc.go
3183
3184
3185
3186
3187
3188
3189
3190
3191
3192
3193
3194
3195
3196
3197
3198
3199
3200
3201
3202
3203
3204
3205
3206
3207
3208
3209
3210
3211
3212
3213
3214
3215
3216
3217
3218
3219
3220
3221
3222
3223
3224
3225
3226
3227
3228
3229
3230
3231
3232
3233
3234
3235
3236
3237
3238
3239
3240
3241
3242
3243
3244
3245
3246
3247
3248
3249
3250
3251
3252
3253
3254
3255
3256
3257
3258
3259
3260
3261
3262
3263
3264
3265
3266
3267
3268
3269
3270
3271
3272
3273
3274
3275
3276
3277
3278
3279
3280
3281
3282
3283
3284
3285
3286
3287
3288
3289
3290
3291
3292
// One round of scheduler: find a runnable goroutine and execute it.
// Never returns.
func schedule() {
    // 获取当前正在运行的g,执行该函数时一般都是系统栈g0
    _g_ := getg() // _g_ = g0

    // 调度开始时 g0.m.locks = 0
    // 校验当前线程没有锁,不允许再持有锁的情况下进行调度,以免造成runtime内部错误
    // 因为m.locks的加锁和解锁时成对出现的,因此这里应该为0
    if _g_.m.locks != 0 {
        throw("schedule: holding locks")
    }

    // g0.m.lockedg = 0
    // 判断当前M有没有和G绑定,如果有,这个M就不能用来执行其他的G了,
    // 只能挂起等待绑定的G得到调度。
    if _g_.m.lockedg != 0 {
        stoplockedm()
        execute(_g_.m.lockedg.ptr(), false) // Never returns.
    }

    // We should not schedule away from a g that is executing a cgo call,
    // since the cgo call is using the m's g0 stack.
    // 
    // 我们不应该安排远离正在执行 cgo 调用的 g,因为 cgo 调用正在使用 m 的 g0 堆栈
    // 判断线程是不是正在进行cgo函数调用,这种情况下g0栈正在被cgo使用,所以也不允许调度。
    if _g_.m.incgo {
        throw("schedule: in cgo")
    }

top:
    pp := _g_.m.p.ptr() // pp = p
    // 通过把preempt字段设置为false,来禁止对P的抢占。
    pp.preempt = false

    // Safety check: if we are spinning, the run queue should be empty.
    // Check this before calling checkTimers, as that might call
    // goready to put a ready goroutine on the local run queue.
    // 
    // 安全检查:如果我们正在自旋,那么运行队列应该是空的。
    // 在调用checkTimers之前请检查这一点,因为这可能会调用goready将准备好的goroutine放入本地运行队列。
    // 对 spinning 的判断属于一致性检验,在P本地runq有任务的情况下,M不应该处于spinning状态。
    if _g_.m.spinning && (pp.runnext != 0 || pp.runqhead != pp.runqtail) {
        throw("schedule: spinning with local work")
    }

    // 寻找一个可用的 goroutine
    // inheritTime:是否继承当前时间片
    //  1. true  继承当前时间片
    //  2. false 不继承当前时间片
    // tryWakeP:当前goroutine是否是普通的。也就是user goroutine。
    //  1. false 普通的goroutine。
    //  2. true  不是普通的goroutine,可能是GC work、trace reader需要唤醒P。
    // gp:当前找到的 goroutine。
    gp, inheritTime, tryWakeP := findRunnable() // blocks until work is available

    // This thread is going to run a goroutine and is not spinning anymore,
    // so if it was marked as spinning we need to reset it now and potentially
    // start a new spinning M.
    //
    // 这个线程将运行一个goroutine,不再旋转,所以如果它被标记为旋转,
    // 我们现在需要重置它,并可能启动一个新的旋转M。
    if _g_.m.spinning {
        // 重置当前M位非旋转,并尝试重新启动一个P标记为旋转。
        resetspinning()
    }

    // sched.disable.user:禁止调度用户goroutine。
    // !schedEnabled(gp):gp是user goroutine。
    // 可能来自GC,其中两种模式不允许GC期间运行user goroutine
    if sched.disable.user && !schedEnabled(gp) {
        // Scheduling of this goroutine is disabled. Put it on
        // the list of pending runnable goroutines for when we
        // re-enable user scheduling and look again.
        // 
        // 此goroutine的调度被禁用。
        // 当我们重新启用用户调度并再次查看时,将它放在挂起的可运行goroutine列表中。
        lock(&sched.lock)
        if schedEnabled(gp) {
            // Something re-enabled scheduling while we
            // were acquiring the lock.
            unlock(&sched.lock)
        } else {
            // user goroutine 时挂起
            sched.disable.runnable.pushBack(gp)
            sched.disable.n++
            unlock(&sched.lock)
            goto top
        }
    }

    // If about to schedule a not-normal goroutine (a GCworker or tracereader),
    // wake a P if there is one.
    //
    // 如果要调度一个不正常的goroutine (GCworker或tracereader),如果有P,则唤醒P。
    // 尝试换新一个新线程绑定P来工作。
    if tryWakeP {
        wakep() 
    }
    if gp.lockedm != 0 {
        // Hands off own p to the locked m,
        // then blocks waiting for a new p.
        //
        // 把自己的p交给锁住的m,然后block等待一个新的p。
        startlockedm(gp)
        goto top
    }

    execute(gp, inheritTime)
}

findRunnable()🚀

  1. 查找要执行的可运行goroutine。试图从其他P中窃取,从本地或全局队列、轮询网络中获取g。
  2. tryWakeP表示返回的不是普通的goroutineGC工作程序、跟踪读取器),因此调用者应该尝试唤醒P
  3. 文件位置:go1.19.3/src/runtime/proc.go
2651
2652
2653
2654
2655
2656
2657
2658
2659
2660
2661
2662
2663
2664
2665
2666
2667
2668
2669
2670
2671
2672
2673
2674
2675
2676
2677
2678
2679
2680
2681
2682
2683
2684
2685
2686
2687
2688
2689
2690
2691
2692
2693
2694
2695
2696
2697
2698
2699
2700
2701
2702
2703
2704
2705
2706
2707
2708
2709
2710
2711
2712
2713
2714
2715
2716
2717
2718
2719
2720
2721
2722
2723
2724
2725
2726
2727
2728
2729
2730
2731
2732
2733
2734
2735
2736
2737
2738
2739
2740
2741
2742
2743
2744
2745
2746
2747
2748
2749
2750
2751
2752
2753
2754
2755
2756
2757
2758
2759
2760
2761
2762
2763
2764
2765
2766
2767
2768
2769
2770
2771
2772
2773
2774
2775
2776
2777
2778
2779
2780
2781
2782
2783
2784
2785
2786
2787
2788
2789
2790
2791
2792
2793
2794
2795
2796
2797
2798
2799
2800
2801
2802
2803
2804
2805
2806
2807
2808
2809
2810
2811
2812
2813
2814
2815
2816
2817
2818
2819
2820
2821
2822
2823
2824
2825
2826
2827
2828
2829
2830
2831
2832
2833
2834
2835
2836
2837
2838
2839
2840
2841
2842
2843
2844
2845
2846
2847
2848
2849
2850
2851
2852
2853
2854
2855
2856
2857
2858
2859
2860
2861
2862
2863
2864
2865
2866
2867
2868
2869
2870
2871
2872
2873
2874
2875
2876
2877
2878
2879
2880
2881
2882
2883
2884
2885
2886
2887
2888
2889
2890
2891
2892
2893
2894
2895
2896
2897
2898
2899
2900
2901
2902
2903
2904
2905
2906
2907
2908
2909
2910
2911
2912
2913
2914
2915
2916
2917
2918
2919
2920
2921
2922
2923
2924
2925
2926
2927
2928
2929
2930
2931
2932
2933
2934
2935
2936
2937
2938
2939
2940
2941
2942
2943
2944
2945
2946
2947
2948
2949
2950
2951
2952
2953
2954
2955
2956
2957
2958
2959
2960
2961
2962
2963
2964
2965
2966
2967
2968
2969
2970
2971
2972
2973
2974
2975
2976
2977
2978
2979
2980
2981
2982
2983
2984
2985
2986
2987
2988
2989
2990
2991
2992
2993
2994
2995
2996
2997
2998
2999
3000
3001
3002
3003
3004
3005
3006
3007
3008
3009
3010
3011
3012
3013
3014
3015
3016
3017
3018
3019
3020
3021
3022
3023
3024
3025
3026
3027
3028
3029
3030
3031
3032
3033
3034
3035
3036
3037
3038
3039
3040
3041
3042
3043
3044
3045
3046
3047
3048
3049
3050
3051
3052
3053
3054
3055
3056
3057
3058
3059
3060
3061
3062
3063
3064
3065
3066
3067
3068
3069
3070
3071
3072
3073
3074
3075
3076
3077
3078
3079
3080
3081
3082
3083
3084
3085
3086
3087
3088
3089
3090
3091
3092
3093
3094
3095
3096
3097
3098
3099
3100
3101
3102
3103
3104
3105
3106
3107
3108
3109
3110
3111
3112
3113
3114
3115
3116
3117
3118
3119
3120
3121
3122
// Finds a runnable goroutine to execute.
// Tries to steal from other P's, get g from local or global queue, poll network.
// tryWakeP indicates that the returned goroutine is not normal (GC worker, trace
// reader) so the caller should try to wake a P.
func findRunnable() (gp *g, inheritTime, tryWakeP bool) {
    _g_ := getg() // _g_ = g0

    // The conditions here and in handoffp must agree: if
    // findrunnable would return a G to run, handoffp must start
    // an M.
    //
    // 这里和handoffp中的条件必须一致:如果findrunnable将返回一个G来运行,则handoffp必须启动一个M。

top:
    _p_ := _g_.m.p.ptr() // _p_ = p
    
    // 1) 帮助STW,抢占当前P。
    
    // STW 即将开始要求等待,挂起当前M。
    // 检测sched.gcwaiting,挂起自己,以便及时响应STW,
    // 调度逻辑中很多地方都有对gcwaiting的检测。
    if sched.gcwaiting != 0 { // 在GC的STW期间被设置
        gcstopm()
        goto top
    }
    
    // 2) 检查当前P是否到达安全点。
    
    // 如果当前P要求运行 runSafePointFn() 函数。
    // runSafePointFn() 函数被GC用来在安全点执行清空工作队列之类的操作。
    if _p_.runSafePointFn != 0 {
        runSafePointFn()
    }

    // 3) 去 timers 里看看,是否有到点的定时器。
    // 这里如果有 timer 到触发点了,会触发并执行注册函数。
    // 由于调度循环是以时间片形式调度的,因此 timer 的触发时间上线就是10ms。
    // 因此抢占能抢占超过10ms以上的goroutine?
    
    // checkTimers 为准备好的 P 运行任何timers
    // 如果 now 不为 0,则为当前时间,如果 now 被传递为 0,则返回传递的时间或当前时间
    // 以及下一个timer应该运行的时间,如果没有下一个计时器,则为 0,并报告它是否运行了任何计时器
    // 如果下一个timer应该运行的时间不为0,它总是大于返回的时间
    // func checkTimers(pp *p, now int64) (rnow, pollUntil int64, ran bool)
    //  参数:
    //    1. pp *p      当前需要检查的P
    //    2. now int64  当前时间,如果为0则取当前时间
    //  返回值:
    //    1. rnow int64:参数now的时间。
    //    2. pollUntil int64:>0.最近timer触发的时间点。0.没有任何timer。
    //    3. ran bool   timer里面是否存在已经延迟时间到点的g,true存在,false不存在
    
    // now and pollUntil are saved for work stealing later,
    // which may steal timers. It's important that between now
    // and then, nothing blocks, so these numbers remain mostly
    // relevant.
    // 
    // now和pollUntil被保存以备以后窃取工作,这可能会窃取timers。
    // 重要的是,从现在到那时,没有任何阻碍,所以这些数字仍然很重要。
    // 处理当前P相关的timers,可能存在一些在timer中的g到时间点了需要放回P中等待被执行
    now, pollUntil, _ := checkTimers(_p_, 0)

    // 4) 尝试安排 trace reader。
    
    // Try to schedule the trace reader.
    // 
    // 尝试安排 trace reader。
    if trace.enabled || trace.shutdown {
        gp = traceReader()
        if gp != nil {
            casgstatus(gp, _Gwaiting, _Grunnable)
            traceGoUnpark(gp, 0)
            return gp, false, true
        }
    }

    // 5) 写标记期间,尝试安排 GC worker。
    
    // Try to schedule a GC worker.
    // 尝试安排 GC worker。
    if gcBlackenEnabled != 0 { // 在GC【并发标记】期间被设置
        // 唤醒标记协程,参看GC文档
        gp, now = gcController.findRunnableGCWorker(_p_, now) // 获取到标记协程
        if gp != nil {
            return gp, false, true // 返回标记协程
        }
    }

    // 6) P调度次数每满61次需要去全局队列拿去goroutine,防止全局goroutine一直得不到运行。
    
    // Check the global runnable queue once in a while to ensure fairness.
    // Otherwise two goroutines can completely occupy the local runqueue
    // by constantly respawning each other.
    //
    // 偶尔检查一次全局可运行队列以确保公平性。
    // 否则,两个goroutine可以通过不断地相互重新部署来完全占据本地运行队列。
    // 为了保证调度的公平性,每个工作线程每进行61次调度就需要优先从全局运行队列中获取goroutine出来运行。
    // 因为如果只调度本地运行队列中的goroutine,则全局运行队列中的goroutine有可能得不到运行。
    // p.schedtick:记录调度发生的次数,实际上在每发生一次goroutine切换且不继承时间片的情况下,该字段会加一。
    // sched.runqsize:记录的是全局就绪队列的长度。也就是全局队列goroutine的个数。
    if _p_.schedtick%61 == 0 && sched.runqsize > 0 {
        lock(&sched.lock)        // 从sched中获取goroutine需要持有lock锁。
        // 从全局队列中获取1个goroutine,然后放入P的本地队列。
        gp = globrunqget(_p_, 1) // 只拿取一个
        unlock(&sched.lock)      // mutex 解锁
        if gp != nil {
            return gp, false, false
        }
    }

    // 7) 是否有 finalizer G。
    
    // Wake up the finalizer G.
    // 
    // 唤醒 finalizer G。
    // 该goroutine由runtime.SetFinalizer函数创造。
    // 只会创建一个goroutine。
    if fingwait && fingwake {
        if gp := wakefing(); gp != nil {
            ready(gp, 0, true)
        }
    }
    if *cgo_yield != nil {
        asmcgocall(*cgo_yield, nil)
    }

    // 8) 从P的本地队列拿去 goroutine。
    
    // local runq
    // 从本地P的队列中获取goroutine。
    if gp, inheritTime := runqget(_p_); gp != nil {
        return gp, inheritTime, false
    }

    // 9) 从全局队列中拿去goroutine。
    
    // global runq
    // 从全局队列中获取goroutine。
    if sched.runqsize != 0 {
        lock(&sched.lock)
        gp := globrunqget(_p_, 0) // 拿取多个
        unlock(&sched.lock)
        if gp != nil {
            return gp, false, false
        }
    }

    // 10) netpoll 是否有就绪的goroutine。
    
    // Poll network.
    // This netpoll is only an optimization before we resort to stealing.
    // We can safely skip it if there are no waiters or a thread is blocked
    // in netpoll already. If there is any kind of logical race with that
    // blocked thread (e.g. it has already returned from netpoll, but does
    // not set lastpoll yet), this thread will do blocking netpoll below
    // anyway.
    // 
    // netpollinited():判断netpoll是否已经初始化。
    // netpollWaiters:是否有等待的goroutine。
    // sched.lastpoll:上次网络轮询的时间点。为0时表示有线程正在阻塞式调用netpoll函数
    if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 {
        // netpoll(0):判断当前是否有就绪事件,0表示立即返回。非阻塞。
        if list := netpoll(0); !list.empty() { // non-blocking
            gp := list.pop() // 弹出一个goroutine。
            injectglist(&list) // 处理剩下的goroutine
            casgstatus(gp, _Gwaiting, _Grunnable) // 修改goroutine状态
            if trace.enabled {
                traceGoUnpark(gp, 0)
            }
            return gp, false, false
        }
    }

    // 11) 以上都没获取到goroutine。标记自旋尝试从其他P中偷取goroutine。
    
    // Spinning Ms: steal work from other Ps.
    //
    // Limit the number of spinning Ms to half the number of busy Ps.
    // This is necessary to prevent excessive CPU consumption when
    // GOMAXPROCS>>1 but the program parallelism is low.
    //
    // Spinning Ms:从其他P那里窃取goroutine。
    // 将自旋的M限制为繁忙M的一半。
    // 这是必要的,以防止在 GOMAXPROCS>>1 但程序并行性较低时过度消耗CPU。
    // 如果当前处于spinning状态的M的数量大于忙碌的P的数量的一半,就让当前M阻塞(休眠)。
    // 目的是避免在gomaxprocs较大而程序实际的并发性很低的情况下,造成不必要的CPU消耗。
    procs := uint32(gomaxprocs) // 获取当前P的数量
    // _g_.m.spinning == true:当前M处于自旋。
    // 2*atomic.Load(&sched.nmspinning) < procs-atomic.Load(&sched.npidle):自旋是繁忙的一半还小时,标记当前M为自旋
    if _g_.m.spinning || 2*atomic.Load(&sched.nmspinning) < procs-atomic.Load(&sched.npidle) {
        if !_g_.m.spinning {
            // 满足偷取的时候才会标记M为自旋状态            
            _g_.m.spinning = true // 标记为自旋状态
            atomic.Xadd(&sched.nmspinning, 1) // 累加自旋M的数量
        }

        // 去其他P中偷取 goroutine。偷取其他P中goroutine的一半。
        // 从p.runnext中偷取的goroutine时,inheritTime该值为true,表示继承上个时间片。
        gp, inheritTime, tnow, w, newWork := stealWork(now)
        now = tnow // 更新当前时间
        // 成功偷取到goroutine
        if gp != nil {
            // Successfully stole.
            return gp, inheritTime, false
        }
        // newWork 某个P中有timer被触发了,在来一次调度循环
        if newWork {
            // There may be new timer or GC work; restart to
            // discover.
            goto top
        }
        // w不为0,表示最近触发的timer的时间点
        if w != 0 && (pollUntil == 0 || w < pollUntil) {
            // Earlier timer to wait for.
            pollUntil = w // 记录最近要触发的时间点
        }
    }
    
    // 12) 有GC标记工作这去帮助GC

    // We have nothing to do.
    //
    // If we're in the GC mark phase, can safely scan and blacken objects,
    // and have work to do, run idle-time marking rather than give up the P.
    //
    // 我们无事可做。
    // 如果我们在GC标记阶段,可以安全地扫描和变黑对象,并且有工作要做,运行空闲时间标记而不是放弃P。
    if gcBlackenEnabled != 0 && gcMarkWorkAvailable(_p_) && gcController.addIdleMarkWorker() {
        node := (*gcBgMarkWorkerNode)(gcBgMarkWorkerPool.pop())
        if node != nil {
            _p_.gcMarkWorkerMode = gcMarkWorkerIdleMode
            gp := node.gp.ptr()
            casgstatus(gp, _Gwaiting, _Grunnable)
            if trace.enabled {
                traceGoUnpark(gp, 0)
            }
            return gp, false, false
        }
        gcController.removeIdleMarkWorker()
    }

    // wasm only:
    // If a callback returned and no other goroutine is awake,
    // then wake event handler goroutine which pauses execution
    // until a callback was triggered.
    gp, otherReady := beforeIdle(now, pollUntil) // 在linux下返回 (nil, false)
    if gp != nil {
        casgstatus(gp, _Gwaiting, _Grunnable)
        if trace.enabled {
            traceGoUnpark(gp, 0)
        }
        return gp, false, false
    }
    if otherReady {
        goto top
    }
    
    // 13) 保存allp、idlepMask和timerpMask的快照
    // 当前工作线程即将休眠,休眠前再次去快照里面看看有没有工作要做

    // Before we drop our P, make a snapshot of the allp slice,
    // which can change underfoot once we no longer block
    // safe-points. We don't need to snapshot the contents because
    // everything up to cap(allp) is immutable.
    //
    // 在我们丢弃P之前,做一个allp切片的快照,一旦我们不再阻塞safe-points,它就会改变。
    // 我们不需要对内容进行快照,因为cap(allp)之前的所有内容都是不可变的。
    allpSnapshot := allp
    // Also snapshot masks. Value changes are OK, but we can't allow
    // len to change out from under us.
    // 
    // 还有快照掩码。值更改是可以的,但是我们不能允许len从我们下面更改。
    idlepMaskSnapshot := idlepMask
    timerpMaskSnapshot := timerpMask

    // 14) 在看一下gcwaiting和runSafePointFn,以及全局队列sched.runqsize
    
    // return P and block
    lock(&sched.lock)
    // 有GC等待 或 P有安全点函数执行
    if sched.gcwaiting != 0 || _p_.runSafePointFn != 0 {
        unlock(&sched.lock)
        goto top
    }
    // 全局队列有 goroutine
    if sched.runqsize != 0 {
        // 因为 sched.lock 锁已被持有,所以一定能取出 gp。不为 nil。
        gp := globrunqget(_p_, 0)
        unlock(&sched.lock)
        return gp, false, false
    }
    
    // 15) 解除当前M与P的绑定关系,并把P加入全局空闲队列中
    
    // 解除m与p的绑定关系,并设置p为空闲状态。
    if releasep() != _p_ {
        throw("findrunnable: wrong p")
    }
    // 把P加入空闲队列
    now = pidleput(_p_, now)
    unlock(&sched.lock)

    // 16) 根据前面快照保存的信息,再次检查其他P是否可偷取,GC有没标记工作需要协助,timer有没触发
    
    // Delicate dance: thread transitions from spinning to non-spinning
    // state, potentially concurrently with submission of new work. We must
    // drop nmspinning first and then check all sources again (with
    // #StoreLoad memory barrier in between). If we do it the other way
    // around, another thread can submit work after we've checked all
    // sources but before we drop nmspinning; as a result nobody will
    // unpark a thread to run the work.
    //
    // This applies to the following sources of work:
    //
    // * Goroutines added to a per-P run queue.
    // * New/modified-earlier timers on a per-P timer heap.
    // * Idle-priority GC work (barring golang.org/issue/19112).
    //
    // If we discover new work below, we need to restore m.spinning as a signal
    // for resetspinning to unpark a new worker thread (because there can be more
    // than one starving goroutine). However, if after discovering new work
    // we also observe no idle Ps it is OK to skip unparking a new worker
    // thread: the system is fully loaded so no spinning threads are required.
    // Also see "Worker thread parking/unparking" comment at the top of the file.
    wasSpinning := _g_.m.spinning
    // 处理当前M是自旋状态
    if _g_.m.spinning {
        _g_.m.spinning = false // 标记当前M未非自旋
        if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
            throw("findrunnable: negative nmspinning")
        }

        // Note the for correctness, only the last M transitioning from
        // spinning to non-spinning must perform these rechecks to
        // ensure no missed work. We are performing it on every M that
        // transitions as a conservative change to monitor effects on
        // latency. See golang.org/issue/43997.

        // Check all runqueues once again.
        // 
        // 再次检查所有运行队列。
        // 检查是否有可偷取的P,如果有则取出一个空闲的P绑定M。
        _p_ = checkRunqsNoP(allpSnapshot, idlepMaskSnapshot)
        if _p_ != nil {
            acquirep(_p_) // 绑定P
            _g_.m.spinning = true
            atomic.Xadd(&sched.nmspinning, 1)
            goto top // 有工作可做再跑一遍调度循环
        }

        // Check for idle-priority GC work again.
        // 
        // 再次检查空闲优先级GC工作。是否有编辑工作需要做
        _p_, gp = checkIdleGCNoP()
        if _p_ != nil {
            acquirep(_p_) // 绑定P
            _g_.m.spinning = true
            atomic.Xadd(&sched.nmspinning, 1)

            // Run the idle worker.
            _p_.gcMarkWorkerMode = gcMarkWorkerIdleMode
            casgstatus(gp, _Gwaiting, _Grunnable)
            if trace.enabled {
                traceGoUnpark(gp, 0)
            }
            return gp, false, false
        }

        // Finally, check for timer creation or expiry concurrently with
        // transitioning from spinning to non-spinning.
        //
        // Note that we cannot use checkTimers here because it calls
        // adjusttimers which may need to allocate memory, and that isn't
        // allowed when we don't have an active P.
        //
        // 最后,再看看timer。
        pollUntil = checkTimersNoP(allpSnapshot, timerpMaskSnapshot, pollUntil)
    }

    // 17) network 是否有需要处理的goroutine,或timer是否即将触发,
    // 标记sched.lastpoll为0,阻塞式等待吧。netpoll中一些读写超时需要用到timer
    
    // Poll network until next timer.
    // 
    // Poll network 直到下一个 timer。
    //  netpollinited():netpoll 已初始化。
    //  netpollWaiters:记录当前goroutine被挂在epoll中的等待数量。
    //  pollUntil:>0.下个timer触发的而时间点。
    //  atomic.Xchg64(&sched.lastpoll, 0):这里是唯一的把sched.lastpoll修改为0的情况,
    //  表示当前需要阻塞式的调用netpoll函数。
    // 这里的atomic.Xchg64(&sched.lastpoll, 0) != 0 具有排他性。只能有一个工作线程处于阻塞等待中。
    if netpollinited() && (atomic.Load(&netpollWaiters) > 0 || pollUntil != 0) && atomic.Xchg64(&sched.lastpoll, 0) != 0 {
        // sched.pollUntil = pollUntil,预计的阻塞时间点。
        atomic.Store64(&sched.pollUntil, uint64(pollUntil))
        if _g_.m.p != 0 {
            throw("findrunnable: netpoll with p")
        }
        if _g_.m.spinning {
            throw("findrunnable: netpoll with spinning")
        }
        // Refresh now.
        now = nanotime()
        delay := int64(-1)
        if pollUntil != 0 {
            // 计算预计阻塞的时间
            delay = pollUntil - now
            if delay < 0 {
                // 触发时间以过,要求立即返回
                delay = 0
            }
        }
        // faketime是自1970年以来模拟的以纳秒为单位的时间。
        // 0值意味着不使用faketime。
        if faketime != 0 {
            // When using fake time, just poll.
            // 当使用 fake time,只是poll。
            delay = 0
        }
        // 阻塞直到有新的work可用,delay是一个具体的时间段
        list := netpoll(delay) // block until new work is available
        atomic.Store64(&sched.pollUntil, 0) // sched.pollUntil = 0
        atomic.Store64(&sched.lastpoll, uint64(now)) // sched.lastpoll = now
        if faketime != 0 && list.empty() {
            // Using fake time and nothing is ready; stop M.
            // When all M's stop, checkdead will call timejump.
            stopm()
            goto top
        }
        lock(&sched.lock)
        // 从空闲P链表中获取一个P
        _p_, _ = pidleget(now) 
        unlock(&sched.lock)
        if _p_ == nil {
            // 没有可用的空闲P时。
            // 如果有就绪的goroutine放入全局goroutine池。
            injectglist(&list)
        } else {
            acquirep(_p_) // 绑定P
            if !list.empty() {
                // 取出一个goroutine,用于返回
                gp := list.pop()
                // 剩余的优先加入全局池。
                injectglist(&list)
                // 修改goroutine状态为待运行状态
                casgstatus(gp, _Gwaiting, _Grunnable)
                if trace.enabled {
                    traceGoUnpark(gp, 0)
                }
                // 返回找到的 goroutine
                return gp, false, false
            }
            // 如果之前M是自旋,则再次标记为自旋并从新再来一次。
            if wasSpinning {
                _g_.m.spinning = true
                atomic.Xadd(&sched.nmspinning, 1)
            }
            goto top
        }
    } else if pollUntil != 0 && netpollinited() {
        // 有其他的线程在阻塞 netpoll。
        // sched.pollUntil:下次timer应该被唤醒时间点。
        pollerPollUntil := int64(atomic.Load64(&sched.pollUntil))
        // timer触发了 或 触发时间已到 叫醒阻塞的netpoll
        if pollerPollUntil == 0 || pollerPollUntil > pollUntil {
            netpollBreak() // 叫醒epoll
        }
    }
    
    // 挂起当前线程等待其他线程唤醒。
    stopm()
    goto top
}

gcstopm()

  1. stopTheWorld停止当前M
  2. TheWordStart时返回。辅助STW
  3. 文件位置:go1.19.3/src/runtime/proc.go
  4. 该方法在GC发起时,其他线程都在这个方法上把自己挂起。
2481
2482
2483
2484
2485
2486
2487
2488
2489
2490
2491
2492
2493
2494
2495
2496
2497
2498
2499
2500
2501
2502
2503
2504
2505
2506
2507
2508
2509
2510
2511
2512
2513
2514
2515
// Stops the current m for stopTheWorld.
// Returns when the world is restarted.
func gcstopm() {
    _g_ := getg() // _g_ = g0

    // sched.gcwaiting:是1,表示当前STW正在等待P停下来。
    if sched.gcwaiting == 0 {
        throw("gcstopm: not waiting for gc")
    }
    // 当前M正处于自旋状态下。
    if _g_.m.spinning {
        _g_.m.spinning = false // 清除自旋标记
        // OK to just drop nmspinning here,
        // startTheWorld will unpark threads as necessary.
        //
        // 自旋计数出现错误
        if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
            throw("gcstopm: negative nmspinning")
        }
    }
    // 解绑当前M与P
    _p_ := releasep()
    lock(&sched.lock)
    // _Pgcstop:GC停止状态。
    // P被STW挂起以执行GC,所有权归执行STW的M所有,执行STW的M会继续使用处于_Pgcstop状态的P。
    _p_.status = _Pgcstop
    // sched.stopwait:记录了STW需要停止的P的数量
    sched.stopwait-- 
    // 已经停下了所有的P,需要唤醒在 sched.stopnote 上发起STW的工作线程。
    if sched.stopwait == 0 {
        notewakeup(&sched.stopnote)
    }
    unlock(&sched.lock)
    stopm() // 停止当前工作线程。
}

releasep()

  1. 解除p和当前m的关联。
  2. 文件位置:go1.19.3/src/runtime/proc.go
4984
4985
4986
4987
4988
4989
4990
4991
4992
4993
4994
4995
4996
4997
4998
4999
5000
5001
5002
5003
5004
5005
5006
5007
// Disassociate p and the current m.
func releasep() *p {
    _g_ := getg() // _g_ = g0

    if _g_.m.p == 0 {
        throw("releasep: invalid arg")
    }
    _p_ := _g_.m.p.ptr() // _p_ = p
    // _p_.m == _g_.m && _p_.status == _Prunning
    if _p_.m.ptr() != _g_.m || _p_.status != _Prunning {
        print("releasep: m=", _g_.m, " m->p=", _g_.m.p.ptr(), " p->m=", hex(_p_.m), " p->status=", _p_.status, "\n")
        throw("releasep: invalid p state")
    }
    if trace.enabled {
        traceProcStop(_g_.m.p.ptr())
    }
    // 解除 p 与 m 相互绑定的关系。
    _g_.m.p = 0
    _p_.m = 0
    // _Pidle:空闲状态。
    // 此时的P没有被用来执行用户代码或调度器代码,通常位于空闲链表中,能够被调度器获取。
    _p_.status = _Pidle
    return _p_
}

stopm()

  1. 停止执行当前m,直到有新的工作可用。返回获取的P
  2. 文件位置:go1.19.3/src/runtime/proc.go
2229
2230
2231
2232
2233
2234
2235
2236
2237
2238
2239
2240
2241
2242
2243
2244
2245
2246
2247
2248
2249
2250
2251
2252
// Stops execution of the current m until new work is available.
// Returns with acquired P.
func stopm() {
    _g_ := getg() // _g_ = g0

    if _g_.m.locks != 0 {
        throw("stopm holding locks")
    }
    if _g_.m.p != 0 {
        throw("stopm holding p")
    }
    if _g_.m.spinning {
        throw("stopm spinning")
    }

    lock(&sched.lock)
    mput(_g_.m) // 把当前m加入sched.midle中。
    unlock(&sched.lock)
    mPark() // 工作线程sleep在m.park上
    
    // 工作线程再次被wakeup时,绑定P。
    acquirep(_g_.m.nextp.ptr()) // 此处p来自m.nextp。
    _g_.m.nextp = 0
}

mput()

  1. mp列入midle列表中。
  2. sched.lock 必须被持有。
  3. 可能在STW期间运行,因此不允许出现写屏障。
  4. 文件位置:go1.19.3/src/runtime/proc.go
5533
5534
5535
5536
5537
5538
5539
5540
5541
5542
5543
5544
5545
5546
// Put mp on midle list.
// sched.lock must be held.
// May run during STW, so write barriers are not allowed.
//
//go:nowritebarrierrec
func mput(mp *m) {
    assertLockHeld(&sched.lock)

    // 把当前m加入sched.midle。
    mp.schedlink = sched.midle
    sched.midle.set(mp)
    sched.nmidle++
    checkdead() // 检查死锁。
}

mPark()

  1. mPark会导致线程自行停驻,一旦被唤醒就会返回。
  2. 文件位置:go1.19.3/src/runtime/proc.go
1452
1453
1454
1455
1456
1457
1458
1459
// mPark causes a thread to park itself, returning once woken.
//
//go:nosplit
func mPark() {
    gp := getg() // gp = g0
    notesleep(&gp.m.park) // sleep
    noteclear(&gp.m.park) // 清除 m.park
}

acquirep()

  1. 关联p和当前m
  2. 文件位置:go1.19.3/src/runtime/proc.go
4938
4939
4940
4941
4942
4943
4944
4945
4946
4947
4948
4949
4950
4951
4952
4953
4954
4955
4956
4957
// Associate p and the current m.
//
// This function is allowed to have write barriers even if the caller
// isn't because it immediately acquires _p_.
//
//go:yeswritebarrierrec
func acquirep(_p_ *p) {
    // Do the part that isn't allowed to have write barriers.
    wirep(_p_)

    // Have p; write barriers now allowed.

    // Perform deferred mcache flush before this P can allocate
    // from a potentially stale mcache.
    _p_.mcache.prepareForSweep()

    if trace.enabled {
        traceProcStart()
    }
}

globrunqget()

  1. 尝试从全局可运行队列中获取一批Gsched.lock必须被持有。
  2. 参数:
    1. _p_ *p:当前工作线程绑定的P
    2. max int32:从全局队列中拿多少个g到本地P中。该参数一般是1,如果是其他P偷取则是大于1。
  3. 返回值*g:从全局队列中那到的goroutine
  4. 文件位置:go1.19.3/src/runtime/proc.go
5601
5602
5603
5604
5605
5606
5607
5608
5609
5610
5611
5612
5613
5614
5615
5616
5617
5618
5619
5620
5621
5622
5623
5624
5625
5626
5627
5628
5629
5630
5631
5632
5633
5634
5635
5636
5637
5638
5639
5640
5641
5642
5643
5644
// Try get a batch of G's from the global runnable queue.
// sched.lock must be held.
func globrunqget(_p_ *p, max int32) *g {
    assertLockHeld(&sched.lock)

    // sched.runqsize:记录的是全局就绪队列的长度。
    // 也就是全局队列goroutine的个数。
    if sched.runqsize == 0 {
        return nil
    }

    // 根据p的数量平分全局运行队列中的goroutines
    n := sched.runqsize/gomaxprocs + 1
    // 上面计算n的方法可能导致n大于全局运行队列中的goroutine数量
    if n > sched.runqsize {
        n = sched.runqsize
    }
    // max:表示最多拿去goroutine个数。
    if max > 0 && n > max {
        n = max
    }
    // 最多只能取本地队列容量的一半。
    // _p_.runq:最大256。
    if n > int32(len(_p_.runq))/2 {
        n = int32(len(_p_.runq)) / 2
    }

    sched.runqsize -= n // 减去取出的数量

    // pop从全局运行队列的队列头取一个goroutine。
    // 这个goroutine用于返回。
    gp := sched.runq.pop()
    n--
    // 遍历从全局队列中拿取goroutine到P的本地队列中。
    for ; n > 0; n-- {
        // 从全局运行队列中取出一个goroutine
        gp1 := sched.runq.pop()
        // 放入本地运行队列,false.放入尾部。
        //  1. go关键字时,调用该方法传入的true
        //  2. 从全局拿取时,这里传入的时false
        runqput(_p_, gp1, false) 
    }
    return gp
}

runqput()

  1. gp放入_p_的尾部。
  2. 参数:
    • _p_ *p:本地P
    • gp *g:需要放入_p_goroutine
    • next booltrue放入本地_p_的开头,false放入本地_p_的尾部。
  3. 文件位置:go1.19.3/src/runtime/proc.go
5780
5781
5782
5783
5784
5785
5786
5787
5788
5789
5790
5791
5792
5793
5794
5795
5796
5797
5798
5799
5800
5801
5802
5803
5804
5805
5806
5807
5808
5809
5810
5811
5812
5813
5814
5815
5816
5817
// runqput tries to put g on the local runnable queue.
// If next is false, runqput adds g to the tail of the runnable queue.
// If next is true, runqput puts g in the _p_.runnext slot.
// If the run queue is full, runnext puts g on the global queue.
// Executed only by the owner P.
func runqput(_p_ *p, gp *g, next bool) {
    if randomizeScheduler && next && fastrandn(2) == 0 {
        next = false
    }

    if next {
    retryNext:
        oldnext := _p_.runnext
        if !_p_.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))) {
            goto retryNext
        }
        if oldnext == 0 {
            return
        }
        // Kick the old runnext out to the regular run queue.
        gp = oldnext.ptr()
    }

retry:
    h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with consumers
    t := _p_.runqtail
    if t-h < uint32(len(_p_.runq)) {
        _p_.runq[t%uint32(len(_p_.runq))].set(gp)
        atomic.StoreRel(&_p_.runqtail, t+1) // store-release, makes the item available for consumption
        return
    }
    // 从_P_中移除一部分到全局中,包含gp。
    if runqputslow(_p_, gp, h, t) {
        return
    }
    // the queue is not full, now the put above must succeed
    goto retry
}

runqget()

  1. 从本地可运行队列中获取goroutine
  2. 如果inheritTimetrue,则gp应继承当前时间片中的剩余时间。否则,它应该开始一个新的时间片。
  3. 多有者由当前P拥有。
  4. 参数:_p_ *p:当前本地P,可能出现其他工作线程M偷取P的情况。
  5. 返回值:
    1. gp *g:当前获取到的goroutine
    2. inheritTime bool:是否继承当前时间片。
  6. 文件位置:go1.19.3/src/runtime/proc.go
5893
5894
5895
5896
5897
5898
5899
5900
5901
5902
5903
5904
5905
5906
5907
5908
5909
5910
5911
5912
5913
5914
5915
5916
5917
5918
5919
5920
5921
5922
5923
5924
5925
5926
5927
5928
5929
5930
5931
5932
5933
5934
5935
// Get g from local runnable queue.
// If inheritTime is true, gp should inherit the remaining time in the
// current time slice. Otherwise, it should start a new time slice.
// Executed only by the owner P.
func runqget(_p_ *p) (gp *g, inheritTime bool) {
    // If there's a runnext, it's the next G to run.
    // 
    // 如果有 runnext,它就是下一个要运行的 G
    next := _p_.runnext
    
    // If the runnext is non-0 and the CAS fails, it could only have been stolen by another P,
    // because other Ps can race to set runnext to 0, but only the current P can set it to non-0.
    // Hence, there's no need to retry this CAS if it falls.
    //
    // 如果 runnext 是 non-0 并且 CAS 失败,它只能被另一个P窃取,因为其他P可以竞相将runnext设置为0,
    // 当前P可以将其设置为非 0。 因此,如果该 CAS 失败,则无需重试。
    if next != 0 && _p_.runnext.cas(next, 0) {
        // 从 p.runnext上取出的goroutine,都继承了上次的时间片
        // channel 的send和recv操作都会把goroutine挂在 p.runnext 上
        return next.ptr(), true
    }

    // p.runnext == 0 || 当前goroutine已被窃取。
    for {
        // 原子读取 _p_.runqhead。
        // 当前P和其他P来偷取goroutine都是从runqhead开始的,因此需要原子读取。
        h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with other consumers
        // runqtail:只有本地P会修改这个值加入goroutine。当前P在此操作因此runqtail不会改变不需原子操作。
        t := _p_.runqtail
        // 本地P队列为空。
        if t == h {
            return nil, false
        }
        //  取出当前h位置上的g,注意循环队列是通过runqhead和runqtail不断的累加然后通过求余判断位置的
        // 由于runqhead和runqtail都是uint32类型循环数组大小为256正好是整倍数,
        // 因此uint32不断累计最后会从0又开始,形成一个循环
        gp := _p_.runq[h%uint32(len(_p_.runq))].ptr()
        // CAS 设置 _p_.runqhead,这段时间可能 _p_.runqhead 的值发生变化而失败。
        if atomic.CasRel(&_p_.runqhead, h, h+1) { // cas-release, commits consume
            return gp, false
        }
    }
}

injectglist()

  1. injectglist 将列表上的每个可运行的G添加到某个运行队列,并清除glist
  2. 如果当前不存在P,则将它们添加到全局队列,并启动多达npim个队列来运行它们。
  3. 否则,对于每个空闲的P,将G添加到全局队列,并启动一个m。剩余的G添加到当前P的本地就绪队列。
  4. 这可能会临时获取sched.lock。可以与GC并发运行。
  5. 该函数在netpoll后调用,可以是监控线程中这种情况下没有P,或则调度循环中。
  6. 文件位置:go1.19.3/src/runtime/proc.go
3113
3114
3115
3116
3117
3118
3119
3120
3121
3122
3123
3124
3125
3126
3127
3128
3129
3130
3131
3132
3133
3134
3135
3136
3137
3138
3139
3140
3141
3142
3143
3144
3145
3146
3147
3148
3149
3150
3151
3152
3153
3154
3155
3156
3157
3158
3159
3160
3161
3162
3163
3164
3165
3166
3167
3168
3169
3170
3171
3172
3173
3174
3175
3176
3177
3178
3179
3180
3181
3182
3183
3184
3185
3186
3187
3188
3189
3190
3191
3192
3193
3194
3195
3196
3197
// injectglist adds each runnable G on the list to some run queue,
// and clears glist. If there is no current P, they are added to the
// global queue, and up to npidle M's are started to run them.
// Otherwise, for each idle P, this adds a G to the global queue
// and starts an M. Any remaining G's are added to the current P's
// local run queue.
// This may temporarily acquire sched.lock.
// Can run concurrently with GC.
func injectglist(glist *gList) {
    // goroutine 空列表
    if glist.empty() {
        return
    }
    if trace.enabled {
        for gp := glist.head.ptr(); gp != nil; gp = gp.schedlink.ptr() {
            traceGoUnpark(gp, 0)
        }
    }

    // Mark all the goroutines as runnable before we put them
    // on the run queues.
    head := glist.head.ptr()
    var tail *g // 获取队列最后一个 goroutine
    qsize := 0 // 记录goroutine数量
    // 修改这些goroutine的状态
    for gp := head; gp != nil; gp = gp.schedlink.ptr() {
        tail = gp
        qsize++
        // _Gwaiting:goroutine阻塞在runtime中,没有执行用户代码。它不在任何runq中,但是应该被记录在其他地方。
        // _Grunnable:goroutine应该在某个runq中,当前并没有在运行用户代码,它的栈不归自己所有。
        casgstatus(gp, _Gwaiting, _Grunnable)
    }

    // Turn the gList into a gQueue.
    // 将这个gList转换为一个gQueue。
    var q gQueue // 双向链表
    q.head.set(head)
    q.tail.set(tail)
    *glist = gList{}

    startIdle := func(n int) {
        // 指定数量的空闲P起来工作
        for ; n != 0 && sched.npidle != 0; n-- {
            startm(nil, false)
        }
    }

    pp := getg().m.p.ptr() // pp
    // 如果来自sysmon监控线程,pp = nil。
    if pp == nil {
        lock(&sched.lock)
        // 放入全局 sched.runq 池中
        globrunqputbatch(&q, int32(qsize))
        unlock(&sched.lock)
        // 唤醒qsize多个空闲P来处理这些goroutine
        startIdle(qsize) 
        return
    }

    // 以下是存在P的情况
    
    // 空闲的P的数量
    npidle := int(atomic.Load(&sched.npidle))
    var globq gQueue
    var n int
    // 把空闲P数量个数的goroutine放入全局池中,然后唤醒P起来工作
    for n = 0; n < npidle && !q.empty(); n++ {
        g := q.pop()
        globq.pushBack(g)
    }
    if n > 0 {
        lock(&sched.lock)
        // 放入全局 sched.runq 池中
        globrunqputbatch(&globq, int32(n))
        unlock(&sched.lock)
        // 唤醒qsize多个空闲P来处理这些goroutine
        startIdle(n)
        qsize -= n
    }

    if !q.empty() {
        // 还剩的goroutine放入本地队列中
        runqputbatch(pp, &q, qsize)
    }
}

stealWork()

  1. stealWork试图从任何P中窃取一个可运行的goroutinetimer
  2. 如果返回值newWorktrue,则新工作可能已经准备好了。
  3. 如果now不是0,则为当前时间。stealWork返回经过的时间,如果now被传递为0,则返回当前时间。
  4. 参数now int64:不是 0,则为当前时间。
  5. 返回值:
    1. gp *g:获取到的goroutine
    2. inheritTime bool:是否继承当前时间片。
    3. rnow int64:获取时间点。
    4. pollUntil int64timer的触发时间点。
    5. newWork bool:为 true,则新工作可能已经准备好了。会跳转到findRunnable函数top标签处从新开始。
  6. 窃取逻辑会循环尝试4次,最后一次才会窃取runnexttimer,也就是说前3次只会从其他P的本地runq中窃取。
  7. stealOrder用来实现一个公平的随机窃取顺序,timerpMaskidlepMask用来快速判断指定位置的P是否有timer或是空闲。
  8. 文件位置:go1.19.3/src/runtime/proc.go
2899
2900
2901
2902
2903
2904
2905
2906
2907
2908
2909
2910
2911
2912
2913
2914
2915
2916
2917
2918
2919
2920
2921
2922
2923
2924
2925
2926
2927
2928
2929
2930
2931
2932
2933
2934
2935
2936
2937
2938
2939
2940
2941
2942
2943
2944
2945
2946
2947
2948
2949
2950
2951
2952
2953
2954
2955
2956
2957
2958
2959
2960
2961
2962
2963
2964
2965
2966
2967
2968
2969
2970
2971
2972
2973
2974
2975
2976
2977
2978
2979
2980
2981
2982
2983
2984
2985
2986
2987
2988
2989
2990
2991
2992
2993
2994
2995
2996
2997
2998
2999
3000
3001
3002
3003
3004
3005
3006
3007
3008
3009
3010
3011
3012
3013
3014
3015
3016
3017
// stealWork attempts to steal a runnable goroutine or timer from any P.
//
// If newWork is true, new work may have been readied.
//
// If now is not 0 it is the current time. stealWork returns the passed time or
// the current time if now was passed as 0.
func stealWork(now int64) (gp *g, inheritTime bool, rnow, pollUntil int64, newWork bool) {
    pp := getg().m.p.ptr() // pp = p

    ranTimer := false // newWork 的返回值

    // 尝试偷取最大次数
    const stealTries = 4 
    // 执行四次遍历,就是尽最大努力去其他P中查看
    for i := 0; i < stealTries; i++ {
        // 最后一次时:
        //  1. 前三次尝试去P的runq本地队列中偷取goroutine。
        //  2. 最后一次先去各个P的timers里看看,然后当偷取的P的runq为空时,
        //     尝试偷取P.runnext上的goroutine。
        stealTimersOrRunNextG := i == stealTries-1

        // randomOrder/randomEnum 是随机工作窃取的辅助类型,一轮allp遍历开始。
        // 它们允许以不同的伪随机顺序枚举所有 P 而不重复。
        // 该算法基于这样一个事实:
        //   如果我们有X使得X和GOMAXPROCS互质,那么(i + X)%GOMAXPROCS的序列给出所需的枚举。
        // stealOrder.start(fastrand()):从一个随机位置开始。
        // !enum.done():当前是否已经遍历一圈了。
        // enum.next():跳转到下一个位置。
        for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() {
            // 有STW正在等待P挂起。直接返回。
            // 跳转到findRunnable函数top标签处从新开始。
            if sched.gcwaiting != 0 {
                // GC work may be available.
                return nil, false, now, pollUntil, true
            }
            
            // 随机选的p,如果是当前的跳过。
            // enum.position():当前偷取P的下标。
            p2 := allp[enum.position()]
            if pp == p2 {
                continue
            }

            // Steal timers from p2. This call to checkTimers is the only place
            // where we might hold a lock on a different P's timers. We do this
            // once on the last pass before checking runnext because stealing
            // from the other P's runnext should be the last resort, so if there
            // are timers to steal do that first.
            //
            // We only check timers on one of the stealing iterations because
            // the time stored in now doesn't change in this loop and checking
            // the timers for each P more than once with the same value of now
            // is probably a waste of time.
            //
            // timerpMask tells us whether the P may have timers at all. If it
            // can't, no need to check at all.
            //
            // 从p2中窃取 timers。对checkTimers的调用是唯一可以对不同P的timers持有锁的地方。
            // 我们在检查runnext之前的最后一遍执行此操作,因为从另一个P的runnext中窃取计时器应该是最后的手段,
            // 所以如果有timers可以窃取,请先窃取。
            // 我们只检查其中一个窃取迭代的定时器,因为存储在now中的时间在这个循环中不会改变,
            // 如果用相同的now值多次检查每个P的定时器,可能就是浪费时间。
            // timerpMask告诉我们P是否有定时器。如果它不能,根本不需要检查。
            // timerpMask 是P的位图记录当前P上是否有 timer。
            if stealTimersOrRunNextG && timerpMask.read(enum.position()) {
                // 再次看看 timers,是否有到点需要执行的timer。
                //   tnow:返回的now
                //   w:触发时间点
                //   ran:timer是否已经运行了
                // 如果ran为true,表示checkTimers()执行了p2的timer,
                // 可能会使某些goroutine变成_Grunnable状态,
                // 所以先检查当前P的本地runq,如果没有找到继续去偷取。
                tnow, w, ran := checkTimers(p2, now)
                now = tnow
                // w != 0:还未触发的时间点
                // pollUntil == 0:上次检查没有timer
                // w < pollUntil:触发时间点缩小
                if w != 0 && (pollUntil == 0 || w < pollUntil) {
                    pollUntil = w // 最近的timer触发的时间点
                }
                // 有触发timer运行,需要去P的本地runq中去找找可能有goroutine被放里面了。
                // 比如time.Sleep。
                if ran {
                    // Running the timers may have
                    // made an arbitrary number of G's
                    // ready and added them to this P's
                    // local run queue. That invalidates
                    // the assumption of runqsteal
                    // that it always has room to add
                    // stolen G's. So check now if there
                    // is a local G to run.
                    if gp, inheritTime := runqget(pp); gp != nil {
                        return gp, inheritTime, now, pollUntil, ranTimer
                    }
                    // 标记为true,再次跑一边调度循环
                    ranTimer = true
                }
            }

            // Don't bother to attempt to steal if p2 is idle.
            // 
            // 如果p2是空闲的不要尝试偷取。
            // 在创建goroutine时候我们遇见过idlepMask,该值是P的位图,记录了所有空闲的P的bit位(原子更新)。
            // idlepMask.read(enum.position()):true.当前P是空闲的,false.当前P不是空闲的。
            if !idlepMask.read(enum.position()) {
                // runqsteal 函数从p2中偷取goroutine到pp中。
                // stealTimersOrRunNextG表示最大程度偷取runnext。
                if gp := runqsteal(pp, p2, stealTimersOrRunNextG); gp != nil {
                    return gp, false, now, pollUntil, ranTimer
                }
            }
        }
    }

    // No goroutines found to steal. Regardless, running a timer may have
    // made some goroutine ready that we missed. Indicate the next timer to
    // wait for.
    return nil, false, now, pollUntil, ranTimer
}

runqsteal()

  1. p2的本地可运行队列中窃取一半的g,并放入p的本地可运行队列中。
  2. 返回一个被窃取的g(如果失败则返回nil)。
  3. 参数:
    • _p_ *p:当前窃取其他P的工作线程绑定的P
    • p2 *p:被窃取的P
    • stealRunNextG booltrue尽最大努力去p.runnext上偷取,false不偷取p.runnextgoroutine
  4. 返回值*g:偷取到的goroutine
  5. 文件位置:go1.19.3/src/runtime/proc.go
6015
6016
6017
6018
6019
6020
6021
6022
6023
6024
6025
6026
6027
6028
6029
6030
6031
6032
6033
6034
6035
6036
6037
6038
6039
6040
6041
6042
6043
// Steal half of elements from local runnable queue of p2
// and put onto local runnable queue of p.
// Returns one of the stolen elements (or nil if failed).
func runqsteal(_p_, p2 *p, stealRunNextG bool) *g {
    t := _p_.runqtail
    // 尝试从p2中偷取一半的goroutine。
    n := runqgrab(p2, &_p_.runq, t, stealRunNextG)
    // p2中也没有
    if n == 0 {
        return nil
    }
    // 从p的本地runq中取出一个goroutine,用于返回给调度器调度起来。
    n--
    // 因为偷取是从runqtail开始的,因此runqtail处也是head头处。
    // 其实是取的队列中的最后一个,方便后面 StoreRel 原子操作设置 runqtail 值。
    gp := _p_.runq[(t+n)%uint32(len(_p_.runq))].ptr()
    if n == 0 {
        return gp
    }
    // 还有其他的需要处理,原子读取 _p_.runqhead
    h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with consumers
    // 这里判断偷取的g是否大于P本地的一半数量,则是溢出了
    if t-h+n >= uint32(len(_p_.runq)) {
        throw("runqsteal: runq overflow")
    }
    // 原子设置 _p_.runqtail = t+n
    atomic.StoreRel(&_p_.runqtail, t+n) // store-release, makes the item available for consumption
    return gp
}

runqgrab()

  1. _p_的可运行队列中获取一批goroutinesbatch
  2. Batch是一个从batchHead开始的环形缓冲区。
  3. 返回抓取的goroutines的数量。可以被任意P执行。
  4. 参数:假设从p2偷取到p
    1. _p_ *p:偷取目标的P。就是p2.
    2. batch *[256]guintptr:从_p_偷取goroutine需要放到的Prunq本地队列池。就是p的本地runq池。
    3. batchHead uint32prunqtail处。
    4. stealRunNextG bool:是否近最大努力去runnext上偷取。
  5. 返回值:uint32:偷取goroutine的数量。
  6. 文件位置:go1.19.3/src/runtime/proc.go
5959
5960
5961
5962
5963
5964
5965
5966
5967
5968
5969
5970
5971
5972
5973
5974
5975
5976
5977
5978
5979
5980
5981
5982
5983
5984
5985
5986
5987
5988
5989
5990
5991
5992
5993
5994
5995
5996
5997
5998
5999
6000
6001
6002
6003
6004
6005
6006
6007
6008
6009
6010
6011
6012
6013
6014
6015
6016
6017
6018
6019
6020
6021
6022
6023
6024
6025
6026
6027
6028
6029
6030
6031
6032
6033
6034
6035
6036
6037
6038
6039
6040
6041
6042
6043
6044
6045
6046
6047
6048
// Grabs a batch of goroutines from _p_'s runnable queue into batch.
// Batch is a ring buffer starting at batchHead.
// Returns number of grabbed goroutines.
// Can be executed by any P.
func runqgrab(_p_ *p, batch *[256]guintptr, batchHead uint32, stealRunNextG bool) uint32 {
    // 偷取goroutine的代码采用的是自旋方式,而没有采用锁来实现。
    for {
        // _p_ 是 p2,之所以需要原子读取因为p2正在运行有并发的可能。
        h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with other consumers
        t := atomic.LoadAcq(&_p_.runqtail) // load-acquire, synchronize with the producer
        // 为什么不担心 runqtail 溢出?
        //  因为,runqtail和runqhead都是uint32类型,就算溢出也能保证计算正确。比如 0 - 3 也能得到正确值
        n := t - h
        n = n - n/2 // 计算偷取的数量,为当前容量的一半。
        // p2 中本地runq池中没有goroutine。
        if n == 0 {
            // p2本地runq池为空,尝试去 runnext 获取goroutine。
            if stealRunNextG {
                // Try to steal from _p_.runnext.
                // 
                // 尝试从 _p_.runnext 中偷取。
                if next := _p_.runnext; next != 0 { // _p_.runnext 上有goroutine。
                    if _p_.status == _Prunning { // 当前P正在运行中
                        // Sleep to ensure that _p_ isn't about to run the g
                        // we are about to steal.
                        // The important use case here is when the g running
                        // on _p_ ready()s another g and then almost
                        // immediately blocks. Instead of stealing runnext
                        // in this window, back off to give _p_ a chance to
                        // schedule runnext. This will avoid thrashing gs
                        // between different Ps.
                        // A sync chan send/recv takes ~50ns as of time of
                        // writing, so 3us gives ~50x overshoot.
                        //
                        // Sleep 已确保 _p_ 不会运行我们将要切取的g。
                        // 这里的重要用例是当 g 在 _p_ ready() 上运行时,另一个 g 然后几乎立即阻塞。 
                        // 不要在这个窗口期切取 runnext,而是退而求其次,让_p_有机会调度runnext。
                        // 这将避免g在不同的Ps之间的抖动。
                        // sync chan 的 send/recv 大约需要 ~50ns,所以给出 3us 大约是它的 50x 倍。
                        if GOOS != "windows" && GOOS != "openbsd" && GOOS != "netbsd" {
                            usleep(3) // sleep 3us
                        } else {
                            // On some platforms system timer granularity is
                            // 1-15ms, which is way too much for this
                            // optimization. So just yield.
                            //
                            // 在某些平台上,系统计时器粒度为1-15ms,这对于这种优化来说太过了。所以就屈服吧。
                            osyield() // 在semaphore中有相关的介绍。会尝试让出CPU,让其他优先级更高的线程执行。
                        }
                    }
                    // CAS 操作交换 _p_.runnext 尝试偷取 goroutine。
                    // 大概率会失败从这里直接退出。
                    if !_p_.runnext.cas(next, 0) {
                        continue
                    }
                    // 偷取到goroutine把它放入P的本地runq池。
                    batch[batchHead%uint32(len(batch))] = next
                    return 1
                }
            }
            return 0
        }
        
        // 读取不一致的 h 和 t 值。
        //  小细节:按理说队列中的goroutine个数最多就是len(_p_.runq),所以n的最大值也就是len(_p_.runq)/2,
        // 那为什么需要这个判断呢?
        // 	原因:读取runqhead和runqtail是两个操作而非一个原子操作,当我们读取runqhead之后但还未读取runqtail之前,
        // 如果有其它线程快速的在增加(这是完全有可能的,其它偷取者从队列中偷取goroutine会增加runqhead,
        // 而队列的所有者往队列中添加goroutine会 增加runqtail)这两个值,则会导致我们读取出来的runqtail已经远远大于
        // 我们之前读取出来放在局部变量h里面的runqhead了。
        // 也就是代码注释中所说的h和t已经不一致了,所以这里需要这个if判断来检测异常情况。
        // 如果 n > uint32(len(_p_.runq)/2) 成立说明在t := atomic.LoadAcq(&_p_.runqtail)代码后runqtail发生了变化。
        if n > uint32(len(_p_.runq)/2) { // read inconsistent h and t
            continue
        }
        // 从p2中拷贝goroutine到p的runq本地池
        for i := uint32(0); i < n; i++ {
            g := _p_.runq[(h+i)%uint32(len(_p_.runq))] // 从p2的h处往后取goroutine
            batch[(batchHead+i)%uint32(len(batch))] = g// 从p.runqtail往后最加
        }
        // CAS 原子交换 p2.runqhead 从 h 修改为 h+n,如果失败说明h被修改了
        //  1. 可能其他P也在偷取这个P的goroutine并且偷取成功了。
        //  2. 当前被偷取的这个P可能也在取runqhead出的goroutine来运行。导致runqhead变化。
        // 为什么不担心 runqtail 的值呢?而是只需要保证 runqhead 和 runqtail 一起是原子的呢?
        //  因为,runqtail 只有当前P正最加,并且是递增的,能保证我们要去的数据n。
        if atomic.CasRel(&_p_.runqhead, h, h+n) { // cas-release, commits consume
            return n
        }
    }
}

type randomOrder struct
  1. stealOrder用来实现一个公平的随机窃取顺序。
  2. 文件位置:go1.19.3/src/runtime/proc.go
// 关于取P得算法
var stealOrder randomOrder

// randomOrder/randomEnum are helper types for randomized work stealing.
// They allow to enumerate all Ps in different pseudo-random orders without repetitions.
// The algorithm is based on the fact that if we have X such that X and GOMAXPROCS
// are coprime, then a sequences of (i + X) % GOMAXPROCS gives the required enumeration.
// 
// randomOrder/randomEnum 是随机工作窃取的辅助类型
// 它们允许以不同的伪随机顺序枚举所有 P 而不重复
// 该算法基于这样一个事实:
//    如果我们有 X 使得 X 和 GOMAXPROCS 互质,那么 (i + X) %GOMAXPROCS 的序列给出所需的枚举
type randomOrder struct {
    count    uint32       // 存储当前所有的P数量,也是CPU的核数
    coprimes []uint32     // 存储与count互质数集
}

type randomEnum struct {
    i     uint32  // 从0开始记录遍历的次数
    count uint32  // randomOrder.count
    pos   uint32  // 当前在[0, count-1]范围的下标位置
    inc   uint32  // 当前在randomOrder.coprimes中选取的值
}

// 重置randomOrder
func (ord *randomOrder) reset(count uint32) {
    ord.count = count	                // 记录总个数
    ord.coprimes = ord.coprimes[:0]   // 清空coprimes
    for i := uint32(1); i <= count; i++ {
        if gcd(i, count) == 1 {
            ord.coprimes = append(ord.coprimes, i)
        }
    }
}

// 生成互质数函数
func gcd(a, b uint32) uint32 {
    for b != 0 {
        a, b = b, a%b
    }
    return a
}

// 开始
func (ord *randomOrder) start(i uint32) randomEnum {
    return randomEnum{
        count: ord.count,
        pos:   i % ord.count,
        inc:   ord.coprimes[i%uint32(len(ord.coprimes))],
    }
}

// 当前是否遍历一圈了
func (enum *randomEnum) done() bool {
    return enum.i == enum.count
}

// 下一个互质数
func (enum *randomEnum) next() {
    enum.i++
    enum.pos = (enum.pos + enum.inc) % enum.count	// 这里是随机的选取下一个随机处
}

// 获取当前位置
func (enum *randomEnum) position() uint32 {
    return enum.pos
}

// 盗取算法解释
//  1. 盗取过程用了两个嵌套for循环。
//  2. 内层循环实现了盗取逻辑,从代码可以看出盗取的实质就是遍历allp中的所有p,查看其运行队列是否有goroutine,如果有,
//     则取其一半到当前工作线程的运行队列,然后从findrunnable返回,如果没有则继续遍历下一个p
//  3. 但这里为了保证公平性,遍历allp时并不是固定的从allp[0]即第一个p开始,而是从随机位置上的p开始,
//     而且遍历的顺序也随机化了,并不是现在访问了第i个p下一次就访问第i+1个p,
//     而是使用了一种伪随机的方式遍历allp中的每个p,防止每次遍历时使用同样的顺序访问allp中的元素
// 下面是这个算法的伪代码:
// offset := uint32(random()) % nprocs
// coprime := 随机选取一个小于nprocs且与nprocs互质的数
// for i := 0; i < nprocs; i++ {
//     p := allp[offset]
//     从p的运行队列偷取goroutine
//     if 偷取成功 {
//         break
//     }
//     offset += coprime
//     offset = offset % nprocs
// }
//
// 下面举例说明一下上述算法过程,现假设nprocs为8,也就是一共有8个p
//    如果第一次随机选择的offset = 6,coprime = 3(3与8互质,满足算法要求)的话,则从allp切片中偷取的下标顺序为
//    6, 1, 4, 7, 2, 5, 0, 3,计算过程:
//        6,(6+3)%8=1,(1+3)%8=4, (4+3)%8=7, (7+3)%8=2, (2+3)%8=5, (5+3)%8=0, (0+3)%8=3
//  如果第二次随机选择的offset = 4,coprime = 5的话,则从allp切片中偷取的下标顺序为
//    1, 6, 3, 0, 5, 2, 7, 4,计算过程:
//        1,(1+5)%8=6,(6+5)%8=3, (3+5)%8=0, (0+5)%8=5, (5+5)%8=2, (2+5)%8=7, (7+5)%8=4

releasep()

  1. 解除当前MP的绑定关系。
  2. 文件位置:go1.19.3/src/runtime/proc.go
4984
4985
4986
4987
4988
4989
4990
4991
4992
4993
4994
4995
4996
4997
4998
4999
5000
5001
5002
5003
5004
5005
5006
5007
5008
5009
5010
// Disassociate p and the current m.
func releasep() *p {
    _g_ := getg() // 获取当前运行的g,这里是g0

    if _g_.m.p == 0 { // 当前要解绑的P不存在,系统代码有逻辑问题
        throw("releasep: invalid arg")
    }
    _p_ := _g_.m.p.ptr() // 获取当前工作线程M绑定的P,也就是需要解绑的P
    // _Prunning 表示 P 由 M 拥有并用于运行用户代码或调度程序
    // 只有拥有这个 P 的 M 才允许从 _Prunning 更改 P 的状态
    // M 可以将 P 转换为 _Pidle(如果它没有更多工作要做)、_Psyscall(当进入系统调用时)或 _Pgcstop(停止 GC)
    // M 也可以将 P 的所有权直接交给另一个 M(例如,安排锁定的 G)
    if _p_.m.ptr() != _g_.m || _p_.status != _Prunning { // 当前P绑定的m与g绑定的m不是同一个 或 当前P不是_Prunning状态
        print("releasep: m=", _g_.m, " m->p=", _g_.m.p.ptr(), " p->m=", hex(_p_.m), " p->status=", _p_.status, "\n")
        throw("releasep: invalid p state")
    }
    if trace.enabled {
        traceProcStop(_g_.m.p.ptr())
    }
    _g_.m.p = 0		// 解绑工作线程M与P的关联
    _p_.m = 0		// 解绑P与M的关联
    // _Pidle 表示 P 未用于运行用户代码或调度程序
    // 通常,它位于空闲 P 列表中并且可供调度程序使用,但它可能只是在其他状态之间转换
    // P 由空闲列表或正在转换其状态的任何东西拥有。 它的运行队列是空的
    _p_.status = _Pidle
    return _p_	// 返回当前P
}

pidleput()

  1. pidleput 将p放到_Pidle列表中。
  2. 这释放了p的所有权。一旦sched.lock被释放,使用p就不再安全了。
  3. sched.lock必须被持有,可以在STW期间运行,因此不允许写入屏障。
  4. 文件位置:go1.19.3/src/runtime/proc.go
5696
5697
5698
5699
5700
5701
5702
5703
5704
5705
5706
5707
5708
5709
5710
5711
5712
5713
5714
5715
5716
5717
5718
5719
// pidleput puts p to on the _Pidle list.
//
// This releases ownership of p. Once sched.lock is released it is no longer
// safe to use p.
//
// sched.lock must be held.
//
// May run during STW, so write barriers are not allowed.
//go:nowritebarrierrec
func pidleput(_p_ *p) {
    assertLockHeld(&sched.lock)

    // 判断P的本地队列不应该还有goroutine,判断下
    if !runqempty(_p_) {
        throw("pidleput: P has non-empty run queue")
    }
    // 如果P没有timer,将清除timerpMask位上对应的掩码位,timerpMask是记录忙碌的
    updateTimerPMask(_p_)   // clear if there are no timers.
    idlepMask.set(_p_.id)   // 设置idlepMask对应的P的掩码位,idlepMask是记录空闲的
    _p_.link = sched.pidle  // 当前P记录全局的空闲链表
    sched.pidle.set(_p_)    // 把当前P链接到全局空闲链表后
    // 使用原子锁把当前sched空闲的P数量加1
    atomic.Xadd(&sched.npidle, 1) // TODO: fast atomic
}

runqempty()

  1. runqempty 报告_p_在其本地运行队列中是否没有 Gs。
  2. 它永远不会虚假地返回true。
  3. 文件位置:go1.19.3/src/runtime/proc.go
5752
5753
5754
5755
5756
5757
5758
5759
5760
5761
5762
5763
5764
5765
5766
5767
5768
5769
5770
5771
5772
5773
// runqempty reports whether _p_ has no Gs on its local run queue.
// It never returns true spuriously.
func runqempty(_p_ *p) bool {
    // Defend against a race where 1) _p_ has G1 in runqnext but runqhead == runqtail,
    // 2) runqput on _p_ kicks G1 to the runq, 3) runqget on _p_ empties runqnext.
    // Simply observing that runqhead == runqtail and then observing that runqnext == nil
    // does not mean the queue is empty.
    // 
    // 以下情况:
    //  1. _p_ 在 runqnext 中有 G1 但 runqhead == runqtail
    //  2. _p_ 上的 runqput 将 G1 踢到 runq
    //  3. _p_ 上的 runqget 清空 runqnext
    // 简单地观察 runqhead == runqtail 然后观察 runqnext == nil 并不意味着队列是空的
    for {
        head := atomic.Load(&_p_.runqhead)	
        tail := atomic.Load(&_p_.runqtail)
        runnext := atomic.Loaduintptr((*uintptr)(unsafe.Pointer(&_p_.runnext)))
        if tail == atomic.Load(&_p_.runqtail) {
            return head == tail && runnext == 0
        }
    }
}

pidleget()

  1. 相关联函数pidleget从空闲列表中获取一个P。
  2. 文件位置:go1.19.3/src/runtime/proc.go
5727
5728
5729
5730
5731
5732
5733
5734
5735
5736
5737
5738
5739
5740
5741
5742
5743
5744
5745
5746
5747
5748
5749
5750
// pidleget tries to get a p from the _Pidle list, acquiring ownership.
//
// sched.lock must be held.
//
// May run during STW, so write barriers are not allowed.
//
//go:nowritebarrierrec
func pidleget(now int64) (*p, int64) {
    assertLockHeld(&sched.lock)

    _p_ := sched.pidle.ptr()
    if _p_ != nil {
        // Timer may get added at any time now.
        if now == 0 {
            now = nanotime()
        }
        timerpMask.set(_p_.id)
        idlepMask.clear(_p_.id)
        sched.pidle = _p_.link
        atomic.Xadd(&sched.npidle, -1)
        _p_.limiterEvent.stop(limiterEventIdle, now)
    }
    return _p_, now
}

checkRunqsNoP()

  1. 检查快照中所有的P是否有可以偷取的G
  2. 文件位置:go1.19.3/src/runtime/proc.go
2974
2975
2976
2977
2978
2979
2980
2981
2982
2983
2984
2985
2986
2987
2988
2989
2990
2991
2992
2993
2994
2995
2996
2997
2998
2999
// Check all Ps for a runnable G to steal.
//
// On entry we have no P. If a G is available to steal and a P is available,
// the P is returned which the caller should acquire and attempt to steal the
// work to.
func checkRunqsNoP(allpSnapshot []*p, idlepMaskSnapshot pMask) *p {
    // 变量allp的快照,allp记录了所有的P列表
    for id, p2 := range allpSnapshot {	// 休眠之前在看一下是否有工作要做
        // idlepMaskSnapshot 快照记录着当前P的状态
        // !idlepMaskSnapshot.read(uint32(id)) 当前P状态不为空的 并且 当前P存储groutine的
        if !idlepMaskSnapshot.read(uint32(id)) && !runqempty(p2) {
            lock(&sched.lock)
            pp := pidleget()	// 从空闲的P中拿去一个P,为后续M绑定P做准备,因为有全局的P存在g可以起去拿来用
            unlock(&sched.lock)
            if pp != nil {
                return pp
            }

            // Can't get a P, don't bother checking remaining Ps.
            // 拿不到P,别费心检查剩余的P
            break
        }
    }

    return nil
}

checkIdleGCNoP()

  1. 检查是否有GC需要帮助。
  2. 文件位置:go1.19.3/src/runtime/proc.go
3013
3014
3015
3016
3017
3018
3019
3020
3021
3022
3023
3024
3025
3026
3027
3028
3029
3030
3031
3032
3033
3034
3035
3036
3037
3038
3039
3040
3041
3042
3043
3044
3045
3046
3047
3048
3049
3050
3051
3052
3053
3054
3055
3056
3057
3058
3059
3060
3061
3062
3063
3064
3065
3066
3067
3068
3069
3070
3071
3072
3073
// Check for idle-priority GC, without a P on entry.
//
// If some GC work, a P, and a worker G are all available, the P and G will be
// returned. The returned P has not been wired yet.
func checkIdleGCNoP() (*p, *g) {
    // N.B. Since we have no P, gcBlackenEnabled may change at any time; we
    // must check again after acquiring a P. As an optimization, we also check
    // if an idle mark worker is needed at all. This is OK here, because if we
    // observe that one isn't needed, at least one is currently running. Even if
    // it stops running, its own journey into the scheduler should schedule it
    // again, if need be (at which point, this check will pass, if relevant).
    if atomic.Load(&gcBlackenEnabled) == 0 || !gcController.needIdleMarkWorker() {
        return nil, nil
    }
    if !gcMarkWorkAvailable(nil) {
        return nil, nil
    }

    // Work is available; we can start an idle GC worker only if there is
    // an available P and available worker G.
    //
    // We can attempt to acquire these in either order, though both have
    // synchronization concerns (see below). Workers are almost always
    // available (see comment in findRunnableGCWorker for the one case
    // there may be none). Since we're slightly less likely to find a P,
    // check for that first.
    //
    // Synchronization: note that we must hold sched.lock until we are
    // committed to keeping it. Otherwise we cannot put the unnecessary P
    // back in sched.pidle without performing the full set of idle
    // transition checks.
    //
    // If we were to check gcBgMarkWorkerPool first, we must somehow handle
    // the assumption in gcControllerState.findRunnableGCWorker that an
    // empty gcBgMarkWorkerPool is only possible if gcMarkDone is running.
    lock(&sched.lock)
    pp, now := pidlegetSpinning(0)
    if pp == nil {
        unlock(&sched.lock)
        return nil, nil
    }

    // Now that we own a P, gcBlackenEnabled can't change (as it requires STW).
    if gcBlackenEnabled == 0 || !gcController.addIdleMarkWorker() {
        pidleput(pp, now)
        unlock(&sched.lock)
        return nil, nil
    }

    node := (*gcBgMarkWorkerNode)(gcBgMarkWorkerPool.pop())
    if node == nil {
        pidleput(pp, now)
        unlock(&sched.lock)
        gcController.removeIdleMarkWorker()
        return nil, nil
    }

    unlock(&sched.lock)

    return pp, node.gp.ptr()
}

checkTimersNoP()

  1. 检查快照中所有的P是否有timer要触发了。
  2. 文件位置:go1.19.3/src/runtime/proc.go
2997
2998
2999
3000
3001
3002
3003
3004
3005
3006
3007
3008
3009
3010
3011
// Check all Ps for a timer expiring sooner than pollUntil.
//
// Returns updated pollUntil value.
func checkTimersNoP(allpSnapshot []*p, timerpMaskSnapshot pMask, pollUntil int64) int64 {
    for id, p2 := range allpSnapshot {
        if timerpMaskSnapshot.read(uint32(id)) {
            w := nobarrierWakeTime(p2)
            if w != 0 && (pollUntil == 0 || w < pollUntil) {
                pollUntil = w
            }
        }
    }

    return pollUntil
}

stopm()

  1. 工作线程进入休眠,等待被其他工作线程唤醒。
  2. 文件位置:go1.19.3/src/runtime/proc.go
2229
2230
2231
2232
2233
2234
2235
2236
2237
2238
2239
2240
2241
2242
2243
2244
2245
2246
2247
2248
2249
2250
// Stops execution of the current m until new work is available.
// Returns with acquired P.
func stopm() {
    _g_ := getg()           // 获取当前工作线程M绑定的g

    if _g_.m.locks != 0 {   // 判断当前工作线程是否有锁为解锁
        throw("stopm holding locks")
    }
    if _g_.m.p != 0 {       // 判断当前工作线程是否还绑定了P,因为前面M与P已经解绑了
        throw("stopm holding p")
    }
    if _g_.m.spinning {     // 判断当前工作线程是否还处于自旋状态标记,应为前面已经取消了该标记
        throw("stopm spinning")
    }

    lock(&sched.lock)       // 锁住全局sched
    mput(_g_.m)             // 把m结构体对象放入sched.midle空闲队列
    unlock(&sched.lock)     // 解锁
    mPark()                 // 进入系统调用进入睡眠
    acquirep(_g_.m.nextp.ptr()) // 工作线程被唤醒后从这里开始执行,给M绑定P
    _g_.m.nextp = 0
}

mput()

  1. 当工作线程空闲时即将进入休眠状态时会判断一次checkdead()
  2. 文件位置:go1.19.3/src/runtime/proc.go
5533
5534
5535
5536
5537
5538
5539
5540
5541
5542
5543
5544
5545
5546
5547
// Put mp on midle list.
// sched.lock must be held.
// May run during STW, so write barriers are not allowed.
//go:nowritebarrierrec
func mput(mp *m) {
    assertLockHeld(&sched.lock) // 检查sched.lock锁

    mp.schedlink = sched.midle  // 当前M记录全局空闲M链表
    sched.midle.set(mp)         // 把当前M追加到全局空闲链表中去
    sched.nmidle++              // 全局空闲M数量加一
    // 检查死锁情况
    // 检查基于运行 M 的数量,如果 0 -> 死锁
    // sched.lock 必须被持有
    checkdead()
}

mget()

  1. 文件位置:go1.19.3/src/runtime/proc.go
5547
5548
5549
5550
5551
5552
5553
5554
5555
5556
5557
5558
5559
5560
// Try to get an m from midle list.
// sched.lock must be held.
// May run during STW, so write barriers are not allowed.
//go:nowritebarrierrec
func mget() *m {
    assertLockHeld(&sched.lock)

    mp := sched.midle.ptr()
    if mp != nil {
        sched.midle = mp.schedlink
        sched.nmidle--
    }
    return mp
}

mPark()

  1. 睡眠函数,mPark()导致线程自行停放,一旦唤醒就返回。
  2. stopm的核心是调用mput把m结构体对象放入sched的midle空闲队列,然后通过notesleep(&m.park)函数让自己进入睡眠状态
  3. note是go runtime实现的一次性睡眠和唤醒机制,一个线程可以通过调用notesleep(*note)进入睡眠状态,而另外一个线程则可以通过notewakeup(*note)把其唤醒
  4. note的底层实现机制跟操作系统相关,不同系统使用不同的机制:
    • 比如linux下使用的futex系统调用。
    • mac下则是使用的pthread_cond_t条件变量。
  5. note对这些底层机制做了一个抽象和封装,这种封装给扩展性带来了很大的好处,比如当睡眠和唤醒功能需要支持新平台时,只需要在note层增加对特定平台的支持即可,不需要修改上层的任何代码。
  6. 回到stopm,当从notesleep函数返回后,需要再次绑定一个p,然后返回到findrunnable函数继续重新寻找可运行的goroutine,一旦找到可运行的goroutine就会返回到schedule函数,并把找到的goroutine调度起来运行,如何把goroutine调度起来运行的代码我们已经分析过了。
  7. 文件位置:go1.19.3/src/runtime/proc.go
1452
1453
1454
1455
1456
1457
1458
// mPark causes a thread to park itself, returning once woken.
//go:nosplit
func mPark() {
    gp := getg()
    notesleep(&gp.m.park)   // 进入休眠状态,这里传入M的park,睡眠在这个上面
    noteclear(&gp.m.park)   // 被其他工作线程唤醒,代码从这里开始执行
}

notesleep()

  1. 实现休眠的函数。
  2. notesleep函数调用futexsleep进入睡眠,这里之所以需要用一个循环,是因为futexsleep有可能意外从睡眠中返回,所以从futexsleep函数返回后还需要检查note.key是否还是0。
  3. 如果是0则表示并不是其它工作线程唤醒了我们,只是futexsleep意外返回了,需要再次调用futexsleep进入睡眠。
  4. 文件位置:go1.19.3/src/runtime/lock_futex.go
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
func notesleep(n *note) {
    gp := getg()    // 获取当前工作线程绑定的g,应该是g0在调度循环过程中被切换到g0了
    if gp != gp.m.g0 {
        throw("notesleep not on g0")
    }
    ns := int64(-1) // 超时时间设置为-1,表示无限期等待
    if *cgo_yield != nil {
        // Sleep for an arbitrary-but-moderate interval to poll libc interceptors.
        // 休眠一个任意但适中的间隔来轮询 libc 拦截器cgo相关的
        ns = 10e6
    }
    // 使用循环,保证不是意外被唤醒
    for atomic.Load(key32(&n.key)) == 0 {
        gp.m.blocked = true                 // blocked表示M在当前的note上被屏蔽
        futexsleep(key32(&n.key), 0, ns)    // 进入休眠函数
        if *cgo_yield != nil {
            asmcgocall(*cgo_yield, nil)
        }
        gp.m.blocked = false
    }
}

futexsleep()

  1. 原子的if(*addr == val)休眠,可能会被虚假唤醒; 这是允许的睡眠时间不要超过ns;ns < 0意味着永远。
  2. 文件位置:go1.19.3/src/runtime/os_linux.go
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
// Atomically,
//	if(*addr == val) sleep
// Might be woken up spuriously; that's allowed.
// Don't sleep longer than ns; ns < 0 means forever.
//
//go:nosplit
func futexsleep(addr *uint32, val uint32, ns int64) {
    // Some Linux kernels have a bug where futex of
    // FUTEX_WAIT returns an internal error code
    // as an errno. Libpthread ignores the return value
    // here, and so can we: as it says a few lines up,
    // spurious wakeups are allowed.
    // 
    // 一些 Linux 内核存在一个错误,即 FUTEX_WAIT 的 futex 返回内部错误代码作为 errno
    // Libpthread 忽略了这里的返回值,我们也可以:正如它所说的几行,虚假唤醒是允许的
    if ns < 0 {	// 永久睡眠
        futex(unsafe.Pointer(addr), _FUTEX_WAIT_PRIVATE, val, nil, nil, 0)
        return
    }

    var ts timespec
    ts.setNsec(ns)  // 设置时间
    futex(unsafe.Pointer(addr), _FUTEX_WAIT_PRIVATE, val, unsafe.Pointer(&ts), nil, 0)
}

futex()

  1. 函数原型:func futex(addr unsafe.Pointer, op int32, val uint32, ts, addr2 unsafe.Pointer, val3 uint32) int32由汇编实现。
  2. futex系统调用为我们提供的功能为如果*addr == val则进入睡眠,否则直接返回
  3. 文件位置:go1.19.3/src/runtime/sys_linux_amd64.s
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
# int64 futex(int32 *uaddr, int32 op, int32 val,
#	struct timespec *timeout, int32 *uaddr2, int32 val2);
TEXT runtime·futex(SB),NOSPLIT,$0
    # 下面的6条指令在为futex系统调用准备参数
    MOVQ    addr+0(FP), DI
    MOVL    op+8(FP), SI
    MOVL    val+12(FP), DX
    MOVQ    ts+16(FP), R10
    MOVQ    addr2+24(FP), R8
    MOVL    val3+32(FP), R9
    
    MOVL    $SYS_futex, AX # 系统调用编号放入AX寄存器
    SYSCALL # 执行futex系统调用进入睡眠,从睡眠中被唤醒后接着执行下一条MOVL指令
    MOVL    AX, ret+40(FP) # 保存系统调用的返回值
    RET

execute() 🚀

  1. gp放到当前M上取运行。该函数从g0栈切换到普通goroutine栈上。
  2. 如果inheritTimetrue,则gp将继承当前时间片中的剩余时间。否则,它将启动一个新的时间片。永远不返回。
  3. 写屏障是允许的,因为这是在几个地方获得P后立即调用的。
  4. 参数:
    1. gp *g:当前调度的goroutine
    2. inheritTime booltrue.继承当前时间片,false.不继承当前时间片。
  5. 文件位置:go1.19.3/src/runtime/proc.go
2508
2509
2510
2511
2512
2513
2514
2515
2516
2517
2518
2519
2520
2521
2522
2523
2524
2525
2526
2527
2528
2529
2530
2531
2532
2533
2534
2535
2536
2537
2538
2539
2540
2541
2542
2543
2544
2545
2546
2547
2548
2549
2550
2551
2552
2553
2554
2555
2556
2557
2558
2559
2560
2561
2562
2563
2564
2565
2566
2567
2568
// Schedules gp to run on the current M.
// If inheritTime is true, gp inherits the remaining time in the
// current time slice. Otherwise, it starts a new time slice.
// Never returns.
//
// Write barriers are allowed because this is called immediately after
// acquiring a P in several places.
//
//go:yeswritebarrierrec
func execute(gp *g, inheritTime bool) {
    // 当前是在系统g0栈
    _g_ := getg() // _g_ = g0

    if goroutineProfile.active {
        // Make sure that gp has had its stack written out to the goroutine
        // profile, exactly as it was when the goroutine profiler first stopped
        // the world.
        tryRecordGoroutineProfile(gp, osyield)
    }
    
    // Assign gp.m before entering _Grunning so running Gs have an
    // M.
    // 
    // 在输入_Grunning之前指定gp.m,以便运行Gs具有m。
    _g_.m.curg = gp // m.curg = gp
    gp.m = _g_.m    // gp.m = m
    // _Grunnable:它当前没有执行用户代码。
    // _Grunning:表示这个goroutine可以执行用户代码。
    casgstatus(gp, _Grunnable, _Grunning) // 修改当前g的状态为运行中
    gp.waitsince = 0    // 设置g被阻塞的大约时间
    // 抢占信号,重复stackguard0 = stackpreempt
    gp.preempt = false
    // const _StackGuard = 928; 
    // 设置当前g栈扩容阈值点。
    gp.stackguard0 = gp.stack.lo + _StackGuard
    // 是否继承当前时间片。具体的抢占在sysmon监控线程中。
    if !inheritTime {
        // 不继承上一个时间片时,调度次数会加一
        _g_.m.p.ptr().schedtick++ // 调度次数加一
    }

    // Check whether the profiler needs to be turned on or off.
    // 
    // 检查分析器是否需要打开或关闭
    hz := sched.profilehz // sched.profilehz:用来设置性能分析的采样频率。
    if _g_.m.profilehz != hz {
        setThreadCPUProfiler(hz)
    }

    if trace.enabled {
        // GoSysExit has to happen when we have a P, but before GoStart.
        // So we emit it here.
        if gp.syscallsp != 0 && gp.sysblocktraced {
            traceGoSysExit(gp.sysexitticks)
        }
        traceGoStart()
    }

    // gogo完成从g0到gp真正的切换
    gogo(&gp.sched)
}

gogo()

  1. gogo()函数完成从g0gp的的切换:CPU执行权的转让以及栈的切换
  2. 函数原型:func gogo(buf *gobuf)
  3. 参数buf *gobuf:需要切换的goroutine的调度信息。
  4. 文件位置:go1.19.3/src/runtime/asm_amd64.s
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
# func gogo(buf *gobuf)
# restore state from Gobuf; longjmp
TEXT runtime·gogo(SB), NOSPLIT, $0-8
    # 1) 取出gobuf信息,里面包含需要调度的信息
    # 取出需要调度的goroutine,gp.sched.g,判断这个goroutine不为nil

    # execute函数在调用gogo时把gp的sched成员的地址作为实参(型参buf)传递了过来
    # 该参数位于FP寄存器所指的位置,所以第一条指令是获取参数
    # buf = &gp.sched; BX = *gobuf
    MOVQ    buf+0(FP), BX # *gobuf
    
    # 把buf的值也就是gp.sched的地址放在了BX寄存器之中
    # 这样便于后面的指令依靠BX寄存器来存取gp.sched的成员
    # 注意这里是间接寻址方式
    # gobuf->g --> dx register
    MOVQ    gobuf_g(BX), DX	# DX = gp.sched.g; *g
    
    # 下面这行代码没有实质作用,检查gp.sched.g是否是nil,如果是nil进程会crash死掉
    # 如果DX为空使用0(DX)形式简介寻址会报错。确保 g 不是 nil。
    MOVQ    0(DX), CX # make sure g != nil
    
    # 2) 使用JMP指令跳转到gogo函数
    JMP gogo<>(SB) # 注意这里使用的是JMP
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
TEXT gogo<>(SB), NOSPLIT, $0
    # 3) 把gp放入TLS中和R14寄存器中

    # 获取当前工作线程M的fs段基址,前面把fs段基址设置成了&m.tls[1]的地址
    get_tls(CX)         # CX = &m.tls[1] = TLS

    # 把DX值也就是需要运行的goroutine的指针写入线程本地存储之中
    # 运行这条指令之前,线程本地存储存放的是g0的地址
    MOVQ    DX, g(CX)   # TLS = gp.sched.g

    # 在g1.17和1.18版本中 R14寄存器被用来指向当前goroutine的runtime.g结构
    # R14 = gp.sched.g
    MOVQ    DX, R14     # set the g register

    # 4) 设置栈顶SP寄存器,切gp的栈,切换栈

    # 把CPU的SP寄存器设置为sched.sp,完成了栈的切换,gp.sched.sp记录着g的栈顶位置
    # 设置CPU的栈顶寄存器SP为gp.sched.sp,这条指令完成了栈的切换,从g0的栈切换到了gp的栈
    # rsp = gp.sched.sp
    MOVQ    gobuf_sp(BX), SP    # restore SP

    # 5) 设置 AX = gp.sched.ret
    #    设置 DX = gp.sched.ctxt    闭包上下文
    #    设置 BP = BP = gp.sched.bp 栈底寄存器

    # 下面三条同样是恢复调度上下文到CPU相关寄存器
    # 需要返回的地址
    MOVQ    gobuf_ret(BX), AX   # AX = gp.sched.ret		
    # 上下文环境,也就是当前注册函数的闭包捕获层,funcval地址处
    MOVQ    gobuf_ctxt(BX), DX  # DX = gp.sched.ctxt	
    # 设置栈基地址
    MOVQ    gobuf_bp(BX), BP    # BP = gp.sched.bp		

    # 6) 清空gp的gobuf.sp、gobuf.ret、gobuf.ctxt、gobuf.bp

    # 清空gp.sched中不再需要的值,因为我们已把相关值放入CPU对应的寄存器了
    # clear to help garbage collector
    MOVQ    $0, gobuf_sp(BX)    # g.gobuf.sp = 0
    MOVQ    $0, gobuf_ret(BX)   # g.gobuf.ret = 0
    MOVQ    $0, gobuf_ctxt(BX)  # g.gobuf.ctxt = 0
    MOVQ    $0, gobuf_bp(BX)    # g.gobuf.bp = 0

    # 7) 跳转到g.gobuf.pc执行gp的相关代码
    # 注意:从g0切换的gp过程中,并没有保存g0的相关栈信息

    # gp.sched.pc 记录着注册函数开始的代码地址
    # 把gp.sched.pc的值读取到BX寄存器,这个pc值是gp这个goroutine马上需要执行的第一条指令的地址
    # 对于runtime.main这个场景来说它现在就是runtime.main函数的第一条指令,现在这条指令的地址就放在BX寄存器里面
    # DX寄存器作为上下文,记录着闭包函数的相关捕获变量
    MOVQ    gobuf_pc(BX), BX    # BX = g.gobuf.pc

    # 这里的JMP BX指令把BX寄存器里面的指令地址放入CPU的rip寄存器
    # 于是,CPU就会跳转到该地址继续执行属于gp这个goroutine的代码,这样就完成了goroutine的切换
    # 还需要注意的是:DX寄存器的值 gp.sched.ctxt 存储的是闭包的上下文信息,如果闭包捕获了变量则会使用它。
    JMP BX  # 跳转到注册的goroutine去执行了
  • 总结:
    1. execute()函数完成从系统栈g0切换到普通goroutine的过程(这里是参数gp)。
    2. 该函数将工作线程mgp相互绑定,然后切换gp的状态并设置栈溢出相关参数,以及设置调度次数,
    3. 接着调用gogo()函数把栈从g0切换成gp的栈,并把gp的调度信息保存在相关寄存器中,然后切换到gp开始执行相关代码。
    4. 这里需要注意的是,从g0切换到gp过程中并没有保存g0的相关栈顶SP相关信息,因此g0栈总是从一个固定的开始的位置开始的。

其他相关函数

checkTimers()

  1. 该函数是唤醒time.Sleeptime.Timer等的相关函数,查看runtime.timer文档。
  2. 文件位置:go1.19.3/src/runtime/proc.go
3269
3270
3271
3272
3273
3274
3275
3276
3277
3278
3279
3280
3281
3282
3283
3284
3285
3286
3287
3288
3289
3290
3291
3292
3293
3294
3295
3296
3297
3298
3299
3300
3301
3302
3303
3304
3305
3306
3307
3308
3309
3310
3311
3312
3313
3314
3315
3316
3317
3318
3319
3320
3321
3322
3323
3324
3325
3326
3327
3328
3329
3330
3331
3332
3333
3334
3335
3336
3337
3338
3339
3340
3341
3342
3343
3344
// checkTimers runs any timers for the P that are ready.
// If now is not 0 it is the current time.
// It returns the passed time or the current time if now was passed as 0.
// and the time when the next timer should run or 0 if there is no next timer,
// and reports whether it ran any timers.
// If the time when the next timer should run is not 0,
// it is always larger than the returned time.
// We pass now in and out to avoid extra calls of nanotime.
//
//go:yeswritebarrierrec
func checkTimers(pp *p, now int64) (rnow, pollUntil int64, ran bool) {
    // If it's not yet time for the first timer, or the first adjusted
    // timer, then there is nothing to do.
    next := int64(atomic.Load64(&pp.timer0When))
    nextAdj := int64(atomic.Load64(&pp.timerModifiedEarliest))
    if next == 0 || (nextAdj != 0 && nextAdj < next) {
        next = nextAdj
    }

    if next == 0 {
        // No timers to run or adjust.
        // 无需运行或调整计时器
        return now, 0, false
    }

    if now == 0 {
        now = nanotime()
    }
    if now < next {
        // Next timer is not ready to run, but keep going
        // if we would clear deleted timers.
        // This corresponds to the condition below where
        // we decide whether to call clearDeletedTimers.
        if pp != getg().m.p.ptr() || int(atomic.Load(&pp.deletedTimers)) <= int(atomic.Load(&pp.numTimers)/4) {
            return now, next, false
        }
    }

    lock(&pp.timersLock)

    // 如果当前P的timers存在数据
    if len(pp.timers) > 0 {	// pp.timers记录着当前P中所有相关的timer
        // adjusttimers 在当前 P 的堆中查找任何已修改为更早运行的定时器,并将它们放在堆中的正确位置
        // 在查找这些计时器时,它还会移动已修改为稍后运行的计时器,并删除已删除的计时器
        // 调用者必须锁定 pp 的计时器
        adjusttimers(pp, now)
        for len(pp.timers) > 0 {
            // Note that runtimer may temporarily unlock
            // pp.timersLock.
            // 请注意,runtimer 可能会暂时解锁 pp.timersLock
            // runtimer函数
            // runtimer 检查timers中的第一个timer。 如果它基于now准备好,它会运行timer并删除或更新它
            // 如果它运行了一个timer,则返回 0,如果没有更多的timer,则返回 -1,或者第一个timer应该运行的时间
            if tw := runtimer(pp, now); tw != 0 {
                if tw > 0 {
                    pollUntil = tw
                }
                break
            }
            ran = true
        }
    }

    // If this is the local P, and there are a lot of deleted timers,
    // clear them out. We only do this for the local P to reduce
    // lock contention on timersLock.
    // 如果当前P是当前工作线程绑定的P,并且有很多被删除的timer,清除它们
    // 我们只对本地 P 这样做,以减少 timersLock 上的锁争用
    if pp == getg().m.p.ptr() && int(atomic.Load(&pp.deletedTimers)) > len(pp.timers)/4 {
        clearDeletedTimers(pp)
    }

    unlock(&pp.timersLock)

    return now, pollUntil, ran
}

checkdead()

  1. 检查死锁情况。检查基于正在运行的M的数量,如果0 -> deadlock
  2. sched.lock必须被持有。
  3. 文件位置:go1.19.3/src/runtime/proc.go
5017
5018
5019
5020
5021
5022
5023
5024
5025
5026
5027
5028
5029
5030
5031
5032
5033
5034
5035
5036
5037
5038
5039
5040
5041
5042
5043
5044
5045
5046
5047
5048
5049
5050
5051
5052
5053
5054
5055
5056
5057
5058
5059
5060
5061
5062
5063
5064
5065
5066
5067
5068
5069
5070
5071
5072
5073
5074
5075
5076
5077
5078
5079
5080
5081
5082
5083
5084
5085
5086
5087
5088
5089
5090
5091
5092
5093
5094
5095
5096
5097
5098
5099
5100
5101
5102
5103
5104
5105
5106
5107
5108
5109
5110
5111
5112
5113
5114
5115
5116
5117
5118
5119
5120
5121
5122
5123
5124
5125
5126
// Check for deadlock situation.
// The check is based on number of running M's, if 0 -> deadlock.
// sched.lock must be held.
func checkdead() {
    // sched.lock 必须被持有
    assertLockHeld(&sched.lock)

    // For -buildmode=c-shared or -buildmode=c-archive it's OK if
    // there are no running goroutines. The calling program is
    // assumed to be running.
    if islibrary || isarchive {
        return
    }

    // If we are dying because of a signal caught on an already idle thread,
    // freezetheworld will cause all running threads to block.
    // And runtime will essentially enter into deadlock state,
    // except that there is a thread that will call exit soon.
    if panicking.Load() > 0 {
        return
    }

    // If we are not running under cgo, but we have an extra M then account
    // for it. (It is possible to have an extra M on Windows without cgo to
    // accommodate callbacks created by syscall.NewCallback. See issue #6751
    // for details.)
    var run0 int32
    if !iscgo && cgoHasExtraM {
        mp := lockextra(true)
        haveExtraM := extraMCount > 0
        unlockextra(mp)
        if haveExtraM {
            run0 = 1
        }
    }

    //func mcount() int32 {
    //    return int32(sched.mnext - sched.nmfreed)
    //}
    run := mcount() - sched.nmidle - sched.nmidlelocked - sched.nmsys
    if run > run0 {
        return
    }
    if run < 0 {
        print("runtime: checkdead: nmidle=", sched.nmidle, " nmidlelocked=", sched.nmidlelocked, " mcount=", mcount(), " nmsys=", sched.nmsys, "\n")
        throw("checkdead: inconsistent counts")
    }

    grunning := 0
    forEachG(func(gp *g) {
        if isSystemGoroutine(gp, false) {
            return
        }
        s := readgstatus(gp)
        switch s &^ _Gscan {
        case _Gwaiting,
            _Gpreempted:
            grunning++
        case _Grunnable,
            _Grunning,
            _Gsyscall:
            print("runtime: checkdead: find g ", gp.goid, " in status ", s, "\n")
            throw("checkdead: runnable g")
        }
    })
    if grunning == 0 { // possible if main goroutine calls runtime·Goexit()
        unlock(&sched.lock) // unlock so that GODEBUG=scheddetail=1 doesn't hang
        fatal("no goroutines (main called runtime.Goexit) - deadlock!")
    }

    // Maybe jump time forward for playground.
    if faketime != 0 {
        if when := timeSleepUntil(); when < maxWhen {
            faketime = when

            // Start an M to steal the timer.
            pp, _ := pidleget(faketime)
            if pp == nil {
                // There should always be a free P since
                // nothing is running.
                throw("checkdead: no p for timer")
            }
            mp := mget()
            if mp == nil {
                // There should always be a free M since
                // nothing is running.
                throw("checkdead: no m for timer")
            }
            // M must be spinning to steal. We set this to be
            // explicit, but since this is the only M it would
            // become spinning on its own anyways.
            sched.nmspinning.Add(1)
            mp.spinning = true
            mp.nextp.set(pp)
            notewakeup(&mp.park)
            return
        }
    }

    // There are no goroutines running, so we can look at the P's.
    for _, pp := range allp {
        // 当某个P中存在timer,即使全部P都睡眠了也不会报错
        if len(pp.timers) > 0 {
            return
        }
    }

    unlock(&sched.lock) // unlock so that GODEBUG=scheddetail=1 doesn't hang
    fatal("all goroutines are asleep - deadlock!")
}