2651
2652
2653
2654
2655
2656
2657
2658
2659
2660
2661
2662
2663
2664
2665
2666
2667
2668
2669
2670
2671
2672
2673
2674
2675
2676
2677
2678
2679
2680
2681
2682
2683
2684
2685
2686
2687
2688
2689
2690
2691
2692
2693
2694
2695
2696
2697
2698
2699
2700
2701
2702
2703
2704
2705
2706
2707
2708
2709
2710
2711
2712
2713
2714
2715
2716
2717
2718
2719
2720
2721
2722
2723
2724
2725
2726
2727
2728
2729
2730
2731
2732
2733
2734
2735
2736
2737
2738
2739
2740
2741
2742
2743
2744
2745
2746
2747
2748
2749
2750
2751
2752
2753
2754
2755
2756
2757
2758
2759
2760
2761
2762
2763
2764
2765
2766
2767
2768
2769
2770
2771
2772
2773
2774
2775
2776
2777
2778
2779
2780
2781
2782
2783
2784
2785
2786
2787
2788
2789
2790
2791
2792
2793
2794
2795
2796
2797
2798
2799
2800
2801
2802
2803
2804
2805
2806
2807
2808
2809
2810
2811
2812
2813
2814
2815
2816
2817
2818
2819
2820
2821
2822
2823
2824
2825
2826
2827
2828
2829
2830
2831
2832
2833
2834
2835
2836
2837
2838
2839
2840
2841
2842
2843
2844
2845
2846
2847
2848
2849
2850
2851
2852
2853
2854
2855
2856
2857
2858
2859
2860
2861
2862
2863
2864
2865
2866
2867
2868
2869
2870
2871
2872
2873
2874
2875
2876
2877
2878
2879
2880
2881
2882
2883
2884
2885
2886
2887
2888
2889
2890
2891
2892
2893
2894
2895
2896
2897
2898
2899
2900
2901
2902
2903
2904
2905
2906
2907
2908
2909
2910
2911
2912
2913
2914
2915
2916
2917
2918
2919
2920
2921
2922
2923
2924
2925
2926
2927
2928
2929
2930
2931
2932
2933
2934
2935
2936
2937
2938
2939
2940
2941
2942
2943
2944
2945
2946
2947
2948
2949
2950
2951
2952
2953
2954
2955
2956
2957
2958
2959
2960
2961
2962
2963
2964
2965
2966
2967
2968
2969
2970
2971
2972
2973
2974
2975
2976
2977
2978
2979
2980
2981
2982
2983
2984
2985
2986
2987
2988
2989
2990
2991
2992
2993
2994
2995
2996
2997
2998
2999
3000
3001
3002
3003
3004
3005
3006
3007
3008
3009
3010
3011
3012
3013
3014
3015
3016
3017
3018
3019
3020
3021
3022
3023
3024
3025
3026
3027
3028
3029
3030
3031
3032
3033
3034
3035
3036
3037
3038
3039
3040
3041
3042
3043
3044
3045
3046
3047
3048
3049
3050
3051
3052
3053
3054
3055
3056
3057
3058
3059
3060
3061
3062
3063
3064
3065
3066
3067
3068
3069
3070
3071
3072
3073
3074
3075
3076
3077
3078
3079
3080
3081
3082
3083
3084
3085
3086
3087
3088
3089
3090
3091
3092
3093
3094
3095
3096
3097
3098
3099
3100
3101
3102
3103
3104
3105
3106
3107
3108
3109
3110
3111
3112
3113
3114
3115
3116
3117
3118
3119
3120
3121
3122
|
// Finds a runnable goroutine to execute.
// Tries to steal from other P's, get g from local or global queue, poll network.
// tryWakeP indicates that the returned goroutine is not normal (GC worker, trace
// reader) so the caller should try to wake a P.
func findRunnable() (gp *g, inheritTime, tryWakeP bool) {
_g_ := getg() // _g_ = g0
// The conditions here and in handoffp must agree: if
// findrunnable would return a G to run, handoffp must start
// an M.
//
// 这里和handoffp中的条件必须一致:如果findrunnable将返回一个G来运行,则handoffp必须启动一个M。
top:
_p_ := _g_.m.p.ptr() // _p_ = p
// 1) 帮助STW,抢占当前P。
// STW 即将开始要求等待,挂起当前M。
// 检测sched.gcwaiting,挂起自己,以便及时响应STW,
// 调度逻辑中很多地方都有对gcwaiting的检测。
if sched.gcwaiting != 0 { // 在GC的STW期间被设置
gcstopm()
goto top
}
// 2) 检查当前P是否到达安全点。
// 如果当前P要求运行 runSafePointFn() 函数。
// runSafePointFn() 函数被GC用来在安全点执行清空工作队列之类的操作。
if _p_.runSafePointFn != 0 {
runSafePointFn()
}
// 3) 去 timers 里看看,是否有到点的定时器。
// 这里如果有 timer 到触发点了,会触发并执行注册函数。
// 由于调度循环是以时间片形式调度的,因此 timer 的触发时间上线就是10ms。
// 因此抢占能抢占超过10ms以上的goroutine?
// checkTimers 为准备好的 P 运行任何timers
// 如果 now 不为 0,则为当前时间,如果 now 被传递为 0,则返回传递的时间或当前时间
// 以及下一个timer应该运行的时间,如果没有下一个计时器,则为 0,并报告它是否运行了任何计时器
// 如果下一个timer应该运行的时间不为0,它总是大于返回的时间
// func checkTimers(pp *p, now int64) (rnow, pollUntil int64, ran bool)
// 参数:
// 1. pp *p 当前需要检查的P
// 2. now int64 当前时间,如果为0则取当前时间
// 返回值:
// 1. rnow int64:参数now的时间。
// 2. pollUntil int64:>0.最近timer触发的时间点。0.没有任何timer。
// 3. ran bool timer里面是否存在已经延迟时间到点的g,true存在,false不存在
// now and pollUntil are saved for work stealing later,
// which may steal timers. It's important that between now
// and then, nothing blocks, so these numbers remain mostly
// relevant.
//
// now和pollUntil被保存以备以后窃取工作,这可能会窃取timers。
// 重要的是,从现在到那时,没有任何阻碍,所以这些数字仍然很重要。
// 处理当前P相关的timers,可能存在一些在timer中的g到时间点了需要放回P中等待被执行
now, pollUntil, _ := checkTimers(_p_, 0)
// 4) 尝试安排 trace reader。
// Try to schedule the trace reader.
//
// 尝试安排 trace reader。
if trace.enabled || trace.shutdown {
gp = traceReader()
if gp != nil {
casgstatus(gp, _Gwaiting, _Grunnable)
traceGoUnpark(gp, 0)
return gp, false, true
}
}
// 5) 写标记期间,尝试安排 GC worker。
// Try to schedule a GC worker.
// 尝试安排 GC worker。
if gcBlackenEnabled != 0 { // 在GC【并发标记】期间被设置
// 唤醒标记协程,参看GC文档
gp, now = gcController.findRunnableGCWorker(_p_, now) // 获取到标记协程
if gp != nil {
return gp, false, true // 返回标记协程
}
}
// 6) P调度次数每满61次需要去全局队列拿去goroutine,防止全局goroutine一直得不到运行。
// Check the global runnable queue once in a while to ensure fairness.
// Otherwise two goroutines can completely occupy the local runqueue
// by constantly respawning each other.
//
// 偶尔检查一次全局可运行队列以确保公平性。
// 否则,两个goroutine可以通过不断地相互重新部署来完全占据本地运行队列。
// 为了保证调度的公平性,每个工作线程每进行61次调度就需要优先从全局运行队列中获取goroutine出来运行。
// 因为如果只调度本地运行队列中的goroutine,则全局运行队列中的goroutine有可能得不到运行。
// p.schedtick:记录调度发生的次数,实际上在每发生一次goroutine切换且不继承时间片的情况下,该字段会加一。
// sched.runqsize:记录的是全局就绪队列的长度。也就是全局队列goroutine的个数。
if _p_.schedtick%61 == 0 && sched.runqsize > 0 {
lock(&sched.lock) // 从sched中获取goroutine需要持有lock锁。
// 从全局队列中获取1个goroutine,然后放入P的本地队列。
gp = globrunqget(_p_, 1) // 只拿取一个
unlock(&sched.lock) // mutex 解锁
if gp != nil {
return gp, false, false
}
}
// 7) 是否有 finalizer G。
// Wake up the finalizer G.
//
// 唤醒 finalizer G。
// 该goroutine由runtime.SetFinalizer函数创造。
// 只会创建一个goroutine。
if fingwait && fingwake {
if gp := wakefing(); gp != nil {
ready(gp, 0, true)
}
}
if *cgo_yield != nil {
asmcgocall(*cgo_yield, nil)
}
// 8) 从P的本地队列拿去 goroutine。
// local runq
// 从本地P的队列中获取goroutine。
if gp, inheritTime := runqget(_p_); gp != nil {
return gp, inheritTime, false
}
// 9) 从全局队列中拿去goroutine。
// global runq
// 从全局队列中获取goroutine。
if sched.runqsize != 0 {
lock(&sched.lock)
gp := globrunqget(_p_, 0) // 拿取多个
unlock(&sched.lock)
if gp != nil {
return gp, false, false
}
}
// 10) netpoll 是否有就绪的goroutine。
// Poll network.
// This netpoll is only an optimization before we resort to stealing.
// We can safely skip it if there are no waiters or a thread is blocked
// in netpoll already. If there is any kind of logical race with that
// blocked thread (e.g. it has already returned from netpoll, but does
// not set lastpoll yet), this thread will do blocking netpoll below
// anyway.
//
// netpollinited():判断netpoll是否已经初始化。
// netpollWaiters:是否有等待的goroutine。
// sched.lastpoll:上次网络轮询的时间点。为0时表示有线程正在阻塞式调用netpoll函数
if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 {
// netpoll(0):判断当前是否有就绪事件,0表示立即返回。非阻塞。
if list := netpoll(0); !list.empty() { // non-blocking
gp := list.pop() // 弹出一个goroutine。
injectglist(&list) // 处理剩下的goroutine
casgstatus(gp, _Gwaiting, _Grunnable) // 修改goroutine状态
if trace.enabled {
traceGoUnpark(gp, 0)
}
return gp, false, false
}
}
// 11) 以上都没获取到goroutine。标记自旋尝试从其他P中偷取goroutine。
// Spinning Ms: steal work from other Ps.
//
// Limit the number of spinning Ms to half the number of busy Ps.
// This is necessary to prevent excessive CPU consumption when
// GOMAXPROCS>>1 but the program parallelism is low.
//
// Spinning Ms:从其他P那里窃取goroutine。
// 将自旋的M限制为繁忙M的一半。
// 这是必要的,以防止在 GOMAXPROCS>>1 但程序并行性较低时过度消耗CPU。
// 如果当前处于spinning状态的M的数量大于忙碌的P的数量的一半,就让当前M阻塞(休眠)。
// 目的是避免在gomaxprocs较大而程序实际的并发性很低的情况下,造成不必要的CPU消耗。
procs := uint32(gomaxprocs) // 获取当前P的数量
// _g_.m.spinning == true:当前M处于自旋。
// 2*atomic.Load(&sched.nmspinning) < procs-atomic.Load(&sched.npidle):自旋是繁忙的一半还小时,标记当前M为自旋
if _g_.m.spinning || 2*atomic.Load(&sched.nmspinning) < procs-atomic.Load(&sched.npidle) {
if !_g_.m.spinning {
// 满足偷取的时候才会标记M为自旋状态
_g_.m.spinning = true // 标记为自旋状态
atomic.Xadd(&sched.nmspinning, 1) // 累加自旋M的数量
}
// 去其他P中偷取 goroutine。偷取其他P中goroutine的一半。
// 从p.runnext中偷取的goroutine时,inheritTime该值为true,表示继承上个时间片。
gp, inheritTime, tnow, w, newWork := stealWork(now)
now = tnow // 更新当前时间
// 成功偷取到goroutine
if gp != nil {
// Successfully stole.
return gp, inheritTime, false
}
// newWork 某个P中有timer被触发了,在来一次调度循环
if newWork {
// There may be new timer or GC work; restart to
// discover.
goto top
}
// w不为0,表示最近触发的timer的时间点
if w != 0 && (pollUntil == 0 || w < pollUntil) {
// Earlier timer to wait for.
pollUntil = w // 记录最近要触发的时间点
}
}
// 12) 有GC标记工作这去帮助GC
// We have nothing to do.
//
// If we're in the GC mark phase, can safely scan and blacken objects,
// and have work to do, run idle-time marking rather than give up the P.
//
// 我们无事可做。
// 如果我们在GC标记阶段,可以安全地扫描和变黑对象,并且有工作要做,运行空闲时间标记而不是放弃P。
if gcBlackenEnabled != 0 && gcMarkWorkAvailable(_p_) && gcController.addIdleMarkWorker() {
node := (*gcBgMarkWorkerNode)(gcBgMarkWorkerPool.pop())
if node != nil {
_p_.gcMarkWorkerMode = gcMarkWorkerIdleMode
gp := node.gp.ptr()
casgstatus(gp, _Gwaiting, _Grunnable)
if trace.enabled {
traceGoUnpark(gp, 0)
}
return gp, false, false
}
gcController.removeIdleMarkWorker()
}
// wasm only:
// If a callback returned and no other goroutine is awake,
// then wake event handler goroutine which pauses execution
// until a callback was triggered.
gp, otherReady := beforeIdle(now, pollUntil) // 在linux下返回 (nil, false)
if gp != nil {
casgstatus(gp, _Gwaiting, _Grunnable)
if trace.enabled {
traceGoUnpark(gp, 0)
}
return gp, false, false
}
if otherReady {
goto top
}
// 13) 保存allp、idlepMask和timerpMask的快照
// 当前工作线程即将休眠,休眠前再次去快照里面看看有没有工作要做
// Before we drop our P, make a snapshot of the allp slice,
// which can change underfoot once we no longer block
// safe-points. We don't need to snapshot the contents because
// everything up to cap(allp) is immutable.
//
// 在我们丢弃P之前,做一个allp切片的快照,一旦我们不再阻塞safe-points,它就会改变。
// 我们不需要对内容进行快照,因为cap(allp)之前的所有内容都是不可变的。
allpSnapshot := allp
// Also snapshot masks. Value changes are OK, but we can't allow
// len to change out from under us.
//
// 还有快照掩码。值更改是可以的,但是我们不能允许len从我们下面更改。
idlepMaskSnapshot := idlepMask
timerpMaskSnapshot := timerpMask
// 14) 在看一下gcwaiting和runSafePointFn,以及全局队列sched.runqsize
// return P and block
lock(&sched.lock)
// 有GC等待 或 P有安全点函数执行
if sched.gcwaiting != 0 || _p_.runSafePointFn != 0 {
unlock(&sched.lock)
goto top
}
// 全局队列有 goroutine
if sched.runqsize != 0 {
// 因为 sched.lock 锁已被持有,所以一定能取出 gp。不为 nil。
gp := globrunqget(_p_, 0)
unlock(&sched.lock)
return gp, false, false
}
// 15) 解除当前M与P的绑定关系,并把P加入全局空闲队列中
// 解除m与p的绑定关系,并设置p为空闲状态。
if releasep() != _p_ {
throw("findrunnable: wrong p")
}
// 把P加入空闲队列
now = pidleput(_p_, now)
unlock(&sched.lock)
// 16) 根据前面快照保存的信息,再次检查其他P是否可偷取,GC有没标记工作需要协助,timer有没触发
// Delicate dance: thread transitions from spinning to non-spinning
// state, potentially concurrently with submission of new work. We must
// drop nmspinning first and then check all sources again (with
// #StoreLoad memory barrier in between). If we do it the other way
// around, another thread can submit work after we've checked all
// sources but before we drop nmspinning; as a result nobody will
// unpark a thread to run the work.
//
// This applies to the following sources of work:
//
// * Goroutines added to a per-P run queue.
// * New/modified-earlier timers on a per-P timer heap.
// * Idle-priority GC work (barring golang.org/issue/19112).
//
// If we discover new work below, we need to restore m.spinning as a signal
// for resetspinning to unpark a new worker thread (because there can be more
// than one starving goroutine). However, if after discovering new work
// we also observe no idle Ps it is OK to skip unparking a new worker
// thread: the system is fully loaded so no spinning threads are required.
// Also see "Worker thread parking/unparking" comment at the top of the file.
wasSpinning := _g_.m.spinning
// 处理当前M是自旋状态
if _g_.m.spinning {
_g_.m.spinning = false // 标记当前M未非自旋
if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
throw("findrunnable: negative nmspinning")
}
// Note the for correctness, only the last M transitioning from
// spinning to non-spinning must perform these rechecks to
// ensure no missed work. We are performing it on every M that
// transitions as a conservative change to monitor effects on
// latency. See golang.org/issue/43997.
// Check all runqueues once again.
//
// 再次检查所有运行队列。
// 检查是否有可偷取的P,如果有则取出一个空闲的P绑定M。
_p_ = checkRunqsNoP(allpSnapshot, idlepMaskSnapshot)
if _p_ != nil {
acquirep(_p_) // 绑定P
_g_.m.spinning = true
atomic.Xadd(&sched.nmspinning, 1)
goto top // 有工作可做再跑一遍调度循环
}
// Check for idle-priority GC work again.
//
// 再次检查空闲优先级GC工作。是否有编辑工作需要做
_p_, gp = checkIdleGCNoP()
if _p_ != nil {
acquirep(_p_) // 绑定P
_g_.m.spinning = true
atomic.Xadd(&sched.nmspinning, 1)
// Run the idle worker.
_p_.gcMarkWorkerMode = gcMarkWorkerIdleMode
casgstatus(gp, _Gwaiting, _Grunnable)
if trace.enabled {
traceGoUnpark(gp, 0)
}
return gp, false, false
}
// Finally, check for timer creation or expiry concurrently with
// transitioning from spinning to non-spinning.
//
// Note that we cannot use checkTimers here because it calls
// adjusttimers which may need to allocate memory, and that isn't
// allowed when we don't have an active P.
//
// 最后,再看看timer。
pollUntil = checkTimersNoP(allpSnapshot, timerpMaskSnapshot, pollUntil)
}
// 17) network 是否有需要处理的goroutine,或timer是否即将触发,
// 标记sched.lastpoll为0,阻塞式等待吧。netpoll中一些读写超时需要用到timer
// Poll network until next timer.
//
// Poll network 直到下一个 timer。
// netpollinited():netpoll 已初始化。
// netpollWaiters:记录当前goroutine被挂在epoll中的等待数量。
// pollUntil:>0.下个timer触发的而时间点。
// atomic.Xchg64(&sched.lastpoll, 0):这里是唯一的把sched.lastpoll修改为0的情况,
// 表示当前需要阻塞式的调用netpoll函数。
// 这里的atomic.Xchg64(&sched.lastpoll, 0) != 0 具有排他性。只能有一个工作线程处于阻塞等待中。
if netpollinited() && (atomic.Load(&netpollWaiters) > 0 || pollUntil != 0) && atomic.Xchg64(&sched.lastpoll, 0) != 0 {
// sched.pollUntil = pollUntil,预计的阻塞时间点。
atomic.Store64(&sched.pollUntil, uint64(pollUntil))
if _g_.m.p != 0 {
throw("findrunnable: netpoll with p")
}
if _g_.m.spinning {
throw("findrunnable: netpoll with spinning")
}
// Refresh now.
now = nanotime()
delay := int64(-1)
if pollUntil != 0 {
// 计算预计阻塞的时间
delay = pollUntil - now
if delay < 0 {
// 触发时间以过,要求立即返回
delay = 0
}
}
// faketime是自1970年以来模拟的以纳秒为单位的时间。
// 0值意味着不使用faketime。
if faketime != 0 {
// When using fake time, just poll.
// 当使用 fake time,只是poll。
delay = 0
}
// 阻塞直到有新的work可用,delay是一个具体的时间段
list := netpoll(delay) // block until new work is available
atomic.Store64(&sched.pollUntil, 0) // sched.pollUntil = 0
atomic.Store64(&sched.lastpoll, uint64(now)) // sched.lastpoll = now
if faketime != 0 && list.empty() {
// Using fake time and nothing is ready; stop M.
// When all M's stop, checkdead will call timejump.
stopm()
goto top
}
lock(&sched.lock)
// 从空闲P链表中获取一个P
_p_, _ = pidleget(now)
unlock(&sched.lock)
if _p_ == nil {
// 没有可用的空闲P时。
// 如果有就绪的goroutine放入全局goroutine池。
injectglist(&list)
} else {
acquirep(_p_) // 绑定P
if !list.empty() {
// 取出一个goroutine,用于返回
gp := list.pop()
// 剩余的优先加入全局池。
injectglist(&list)
// 修改goroutine状态为待运行状态
casgstatus(gp, _Gwaiting, _Grunnable)
if trace.enabled {
traceGoUnpark(gp, 0)
}
// 返回找到的 goroutine
return gp, false, false
}
// 如果之前M是自旋,则再次标记为自旋并从新再来一次。
if wasSpinning {
_g_.m.spinning = true
atomic.Xadd(&sched.nmspinning, 1)
}
goto top
}
} else if pollUntil != 0 && netpollinited() {
// 有其他的线程在阻塞 netpoll。
// sched.pollUntil:下次timer应该被唤醒时间点。
pollerPollUntil := int64(atomic.Load64(&sched.pollUntil))
// timer触发了 或 触发时间已到 叫醒阻塞的netpoll
if pollerPollUntil == 0 || pollerPollUntil > pollUntil {
netpollBreak() // 叫醒epoll
}
}
// 挂起当前线程等待其他线程唤醒。
stopm()
goto top
}
|