Introduction #
The CPU resource allocation object in Java is a Thread, and the CPU resource allocation object in Go is a goroutine. Java Threads correspond one-to-one with operating system threads; goroutines are lightweight user threads implemented in Go and managed through GPM, which have an n:m relationship with the operating system.
This article aims to analyze source code to study the specific implementation and design ideas of the thread model in Java and Go.
Java Thread #
Hotspot JavaThread #
In the Hotspot VM, the Java java.lang.Thread
and the operating system thread are in a 1:1 relationship. The Java thread will create the corresponding operating system thread when it starts, and the operating system thread will be reclaimed when the Java thread terminates.
The thread-related code in Hotspot is in jdk/src/hotspot/share/runtime
. JavaThread in Java 11 is defined in thread.hpp
and thread.cpp
. The source code can be found in jdk-11+28 hotspot runtime
// Class hierarchy
// - Thread
// - NamedThread
// - VMThread
// - ConcurrentGCThread
// - WorkerThread
// - GangWorker
// - GCTaskThread
// - JavaThread
// - various subclasses eg CompilerThread, ServiceThread
// - WatcherThread
// Thread
class Thread: public ThreadShadow {
friend class VMStructs;
friend class JVMCIVMStructs;
private:
GCThreadLocalData _gc_data; // ThreadLocal data in GC
protected:
void* _real_malloc_address; // Real allocation address
protected:
OSThread* _osthread; // The os Thread associated with the Thread
}
// JavaThread
class JavaThread: public Thread {
friend class VMStructs;
friend class JVMCIVMStructs;
friend class WhiteBox;
private:
JavaThread* _next; // The next thread in the thread list
bool _on_thread_list; // Whether the current JavaThread is in the thread list
oop _threadObj; // Java-level thread object
}
JavaThread::JavaThread(bool is_attaching_via_jni) :
Thread() {
initialize();
if (is_attaching_via_jni) {
_jni_attach_state = _attaching_via_jni;
} else {
_jni_attach_state = _not_attaching_via_jni;
}
assert(deferred_card_mark().is_empty(), "Default MemRegion ctor");
}
In Hotspot for Java 11, there will be a parent class Thread, and _osThread
is defined in Thread to associate with the operating system thread. The following subclass implementations are available for different parent classes ( HotSpot Runtime Overview #Thread Management ):
- NamedThread: a thread with a name that represents a uniquely named instance, defined by
_name
to indicate the thread name- VMThread: a thread responsible for executing VM operations such as scavenge, garbage_collect, etc.
- ConcurrentGCThread: a thread responsible for concurrently executing garbage collection
- WorkerThread: refers to a worker thread with an assigned work ID, with
_id
as the specific work ID- GangWorker: literally a gang worker thread
- GCTaskThread: the worker thread responsible for GC tasks
- JavaThread: corresponds to
java.lang.Thread
- CompilerThread: the thread responsible for converting compiled bytecode to machine code (native thread) during runtime
- ServiceThread: used for low memory detection and JVMTI (native interface provided by the virtual machine)
- WatcherThread: responsible for simulating timer interrupts
public:
// Constructor
JavaThread(); // delegating constructor
JavaThread(bool is_attaching_via_jni); // for main thread and JNI attached threads
JavaThread(ThreadFunction entry_point, size_t stack_size = 0);
~JavaThread();
JavaThread::JavaThread(bool is_attaching_via_jni) :
Thread() {
initialize();
if (is_attaching_via_jni) {
_jni_attach_state = _attaching_via_jni;
} else {
_jni_attach_state = _not_attaching_via_jni;
}
assert(deferred_card_mark().is_empty(), "Default MemRegion ctor");
}
JavaThread::JavaThread(ThreadFunction entry_point, size_t stack_sz) :
Thread() {
initialize();
_jni_attach_state = _not_attaching_via_jni;
set_entry_point(entry_point);
// Create the native thread itself.
// %note runtime_23
os::ThreadType thr_type = os::java_thread;
thr_type = entry_point == &compiler_thread_entry ? os::compiler_thread :
os::java_thread;
os::create_thread(this, thr_type, stack_sz);
}
Looking specifically at the JavaThread constructor, we can see that it has two constructors, of which JavaThread(bool is_attaching_via_jni) is only used for connecting the main thread to JNI. Therefore, the constructor we use for user threads is JavaThread(ThreadFunction entry_point, size_t stack_size = 0)
, where _osthread
is created and bound, which also determines that in the Hotspot virtual machine, Java Threads and operating system threads are one-to-one corresponding.
Thread JNI #
The JavaThread is defined in the Hotstop virtual machine in C++, corresponding to the Thread class defined in the JDK. Java calls C++ using JNI (Java Native Interface). So how does JNI connect these two pieces of code?
Let’s take a look at the source code below to see the process of creating a JavaThread in the VM, from creating a Thread object in Java
public class Thread implements Runnable {
public synchronized void start() {
if (threadStatus != 0)
throw new IllegalThreadStateException();
group.add(this);
boolean started = false;
try {
// native 方法 创建
start0();
started = true;
} finally {
try {
if (!started) {
group.threadStartFailed(this);
}
} catch (Throwable ignore) {
}
}
}
private native void start0();
}
As you can see in this part, in the Thread.start()
method, the native method start0()
is called to create an operating system thread. In other words, just new Thread()
only creates a thread object in the Java program, but does not have the corresponding system resources. Only after the start
method is called will the corresponding resources be created in the virtual machine. The JDK defines the function in the VM corresponding to the native method in Java in jdk/src/java.base/share/native/libjava/Thread.c
.
It should be noted that the JNI definition is implemented in c in the JDK, but not in the source code of the virtual machine. This means that if you are not using the Hotspot virtual machine, you only need to implement the JVM_StartThread method in a new virtual machine based on openjdk.
static JNINativeMethod methods[] = {
{"start0", "()V", (void *)&JVM_StartThread},
{"stop0", "(" OBJ ")V", (void *)&JVM_StopThread},
{"isAlive", "()Z", (void *)&JVM_IsThreadAlive},
{"suspend0", "()V", (void *)&JVM_SuspendThread},
{"resume0", "()V", (void *)&JVM_ResumeThread},
{"setPriority0", "(I)V", (void *)&JVM_SetThreadPriority},
{"yield", "()V", (void *)&JVM_Yield},
{"sleep", "(J)V", (void *)&JVM_Sleep},
{"currentThread", "()" THD, (void *)&JVM_CurrentThread},
{"countStackFrames", "()I", (void *)&JVM_CountStackFrames},
{"interrupt0", "()V", (void *)&JVM_Interrupt},
{"isInterrupted", "(Z)Z", (void *)&JVM_IsInterrupted},
{"holdsLock", "(" OBJ ")Z", (void *)&JVM_HoldsLock},
{"getThreads", "()[" THD, (void *)&JVM_GetAllThreads},
{"dumpThreads", "([" THD ")[[" STE, (void *)&JVM_DumpThreads},
{"setNativeName", "(" STR ")V", (void *)&JVM_SetNativeThreadName},
};
So how is JVM_StartThread implemented in the Hotspot virtual machine? Here we need to look at jdk/src/hotspot/share/prims/jvm.cpp
JVM_ENTRY(void, JVM_StartThread(JNIEnv* env, jobject jthread))
JVMWrapper("JVM_StartThread");
JavaThread *native_thread = NULL;
bool throw_illegal_thread_state = false;
// We must release the Threads_lock before we can post a jvmti event
// in Thread::start.
{
MutexLocker mu(Threads_lock);
if (java_lang_Thread::thread(JNIHandles::resolve_non_null(jthread)) != NULL) {
throw_illegal_thread_state = true;
} else {
jlong size =
java_lang_Thread::stackSize(JNIHandles::resolve_non_null(jthread));
NOT_LP64(if (size > SIZE_MAX) size = SIZE_MAX;)
size_t sz = size > 0 ? (size_t) size : 0;
// Create the JavaThread in Hotspot and also create the corresponding _osthread
native_thread = new JavaThread(&thread_entry, sz);
if (native_thread->osthread() != NULL) {
// Note: the current thread is not being used within "prepare".
native_thread->prepare(jthread);
}
}
}
if (throw_illegal_thread_state) {
THROW(vmSymbols::java_lang_IllegalThreadStateException());
}
assert(native_thread != NULL, "Starting null thread?");
if (native_thread->osthread() == NULL) {
native_thread->smr_delete();
if (JvmtiExport::should_post_resource_exhausted()) {
JvmtiExport::post_resource_exhausted(
JVMTI_RESOURCE_EXHAUSTED_OOM_ERROR | JVMTI_RESOURCE_EXHAUSTED_THREADS,
os::native_thread_creation_failed_msg());
}
THROW_MSG(vmSymbols::java_lang_OutOfMemoryError(),
os::native_thread_creation_failed_msg());
}
// Start execution. Execute os::start_thread
Thread::start(native_thread);
JVM_END
Let’s summarize the chain above
Thread::start() →
native start0() → JNI JNINativeMethod →
JVM_StartThread → Hotstop JavaThread → osthread
Thread state JMVTI Thread State #
The Thread.class in the JDK and JavaThread in the c++ code correspond one-to-one. The specific execution and flow of the thread are also handled by the virtual machine. Therefore, the thread state (thread status) also needs a corresponding place in the Hotstop virtual machine, which is JMVTI Thread State. The corresponding source code is jvmtiThreadState.hpp
- NEW → JVMTI_THREAD_STATE_ALIVE :
0x0001
- RUNNABLE → JVMTI_THREAD_STATE_RUNNABLE :
0x0004
- BLOCKED → JVMTI_THREAD_STATE_BLOCKED_ON_MONITOR_ENTER :
0x0400
- WAITING → JVMTI_THREAD_STATE_WAITING_INDEFINITELY :
0x0010
- TIMED_WAITING → JVMTI_THREAD_STATE_WAITING_WITH_TIMEOUT :
0x0020
- TERMINATED → JVMTI_THREAD_STATE_TERMINATED :
0x0002
public class Thread implements Runnable {
/*
* Java thread status for tools, default indicates thread 'not yet started'
*/
private volatile int threadStatus;
public enum State {
// Thread state for a thread which has not yet started.
NEW,
// Thread state for a runnable thread.
RUNNABLE,
// Thread state for a thread blocked waiting for a monitor lock.
BLOCKED,
// Thread state for a waiting thread.
WAITING,
// Thread state for a waiting thread with a specified waiting time.
TIMED_WAITING,
// The thread has completed execution.
TERMINATED;
}
}
public class VM {
/* The threadStatus field is set by the VM at state transition
* in the hotspot implementation. Its value is set according to
* the JVM TI specification GetThreadState function.
*/
private static final int JVMTI_THREAD_STATE_ALIVE = 0x0001;
private static final int JVMTI_THREAD_STATE_TERMINATED = 0x0002;
private static final int JVMTI_THREAD_STATE_RUNNABLE = 0x0004;
private static final int JVMTI_THREAD_STATE_BLOCKED_ON_MONITOR_ENTER = 0x0400;
private static final int JVMTI_THREAD_STATE_WAITING_INDEFINITELY = 0x0010;
private static final int JVMTI_THREAD_STATE_WAITING_WITH_TIMEOUT = 0x0020;
public static Thread.State toThreadState(int threadStatus) {
if ((threadStatus & JVMTI_THREAD_STATE_RUNNABLE) != 0) {
return RUNNABLE;
} else if ((threadStatus & JVMTI_THREAD_STATE_BLOCKED_ON_MONITOR_ENTER) != 0) {
return BLOCKED;
} else if ((threadStatus & JVMTI_THREAD_STATE_WAITING_INDEFINITELY) != 0) {
return WAITING;
} else if ((threadStatus & JVMTI_THREAD_STATE_WAITING_WITH_TIMEOUT) != 0) {
return TIMED_WAITING;
} else if ((threadStatus & JVMTI_THREAD_STATE_TERMINATED) != 0) {
return TERMINATED;
} else if ((threadStatus & JVMTI_THREAD_STATE_ALIVE) == 0) {
return NEW;
} else {
return RUNNABLE;
}
}
}
Thread pool #
ThreadPoolExecutor #
The official Java library provides the thread pool ThreadPoolExecutor in the JDK for thread reuse and thread management. There have been many excellent articles on ThreadPoolExecutor, so here we will just highlight a few of the more important parameters and the execution process of the thread pool
There are the following three important parameters when creating a ThreadPoolExecutor thread pool:
- corePoolSize: the number of core threads in the thread pool, which refers to the minimum number of threads that must be kept active
- maximumPoolSize: the maximum number of threads that the thread pool is allowed to create. When the thread pool’s task queue is full, the maximum number of threads that can be created
- workQueue: the queue where the thread pool stores tasks, used to store all pending tasks of the thread pool
There are also some internal parameters that are crucial:
- AtomicInteger ctl: the main control parameter, which records the state of the thread pool and the number of worker threads. ctl is a 32-bit binary (including the sign bit), and after the sign bit is removed, there are 31 bits. The last 29 bits are used in the thread pool to indicate the number of worker threads, which can count up to 500 million; the first 3 bits (including the sign bit) are used to indicate the state of the thread pool. Modifications to ctl are guaranteed to be thread-consistent via CAS + volatile.
- HashSet
workers : the running worker threads, which must be accessed after obtaining mainLock. The number of workers is the same as the number stored in ctl - ReentrantLock mainLock: the lock for the thread pool. When accessing the workers in the thread pool, this lock must be obtained first. Note that this is a non-fair lock
The execution process of the thread pool is as follows:
- If the current number of worker threads is less than the number of core thread pools corePoolSize, then attempt to create a worker thread and execute the task. If successful, return; otherwise go to 2
- Attempt to add the task to the waiting queue workQueue. If the addition is successful, check again (there may be deadlocks after the check from step 1). If the second check finds that there is no running thread, then create a worker thread and execute; otherwise go to 3
- If the queue failed to be added, try to create a worker thread directly. If the number of currently running threads does not exceed maximumPoolSize, create a worker thread and execute it; otherwise, execute the rejection policy handler
public class ThreadPoolExecutor extends AbstractExecutorService {
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private final ReentrantLock mainLock = new ReentrantLock();
private final HashSet<Worker> workers = new HashSet<>();
private volatile int corePoolSize;
private volatile int maximumPoolSize;
private final BlockingQueue<Runnable> workQueue;
private volatile long keepAliveTime;
private volatile ThreadFactory threadFactory;
private volatile RejectedExecutionHandler handler;
private static final int COUNT_BITS = Integer.SIZE - 3; // 29
// 11111...111 2 ^ 29 - 1
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;
// The state of the thread pool itself
private static final int RUNNING = -1 << COUNT_BITS; // 101
private static final int SHUTDOWN = 0 << COUNT_BITS; // 000
private static final int STOP = 1 << COUNT_BITS; // 001
private static final int TIDYING = 2 << COUNT_BITS; // 010
private static final int TERMINATED = 3 << COUNT_BITS; // 011
// Remove the first 3 bits of the state to get the number of worker threads
private static int workerCountOf(int c) { return c & COUNT_MASK; }
// rs-state (RUNNING/SHUTDOWN/STOP/TIDYING/TERMINATED)
// If wc = 0, this method returns the thread pool state; if w = workerCountOf(c), this method returns the real value of ctl
private static int ctlOf(int rs, int wc) { return rs | wc; }
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
}
For more details, please refer to these two articles
ForkJoinPool #
ForkJoinPool is another thread pool provided by the official library after JDK 1.7. It is a thread pool based on the Fork/Join framework and is provided in versions after JDK 1.7. ForkJoinPool mainly uses two ideas ( A Java Fork/Join Framework):
- Divide-and-Conquer (Divide-and-Conquer): Recursively divide a task into subtasks. For example, divide a task of size N into K smaller tasks of size smaller, which are independent of each other and have the same nature as the original problem. Wait for the subtasks to complete, collect their results and merge them (Join).
- Work-stealing algorithm (Work-Stealing): When a worker thread has no local tasks to run, it attempts to “steal” a task from another worker thread. For example, one worker thread uses the first-in, first-out method to execute subtasks, and another idle worker thread “steals” tasks using the first-in, first-out method.
Combined with ForkJoinPool, it can be roughly understood as
- The Runnable or Callable task added by the user to the ForkJoinPool does not necessarily create a corresponding ForkJoinWorkerThread (one-to-one correspondence with Thread), but is used as a ForkJoinTask subtask, waiting to be executed by an idle or newly created ForkJoinWorkerThread.
- ForkJoinPool and ForkJoinWorkerThread each maintain a workQueue
- The workQueue in the ForkJoinPool stores the ForkJoinWorkerThread on the odd-numbered bits and the ForkJoinTask[] array on the even-numbered bits
- The workQueue in the ForkJoinWorkerThread stores the ForkJoinTask[] array, which mainly stores the subtasks that need to be executed by the current worker thread
- During execution, ForkJoinWorkerThread will call the
runWork()
method provided by ForkJoinPool, and continuously scan the workQueue viascan()
. If there are idle threads or too few worker threads, it will attempt to steal and hand over ForkJoinTask to other Threads for execution
Based on this, the flowchart of ForkJoinPool execution is as follows:
As for other parameters and specific methods, etc., it is recommended to read the following article, which explains in great detail
Virtual Threads #
OPENJDK19 first introduced the concept of Virtual Threads. According to the description, a virtual thread is a lightweight thread. At the Hotspot VM level, its implementation is still based on JavaThread, except that an object pointer _vthread
is added to represent Virtual Threads. ( JEP 425: Virtual Threads (Preview), JEP 436: Virtual Threads (Second Preview), JEP 444: Virtual Threads )
So Java already has these thread pools, why do we need Virtual Threads?
Although the ForkJoinPool based on thread sharing has improved the throughput of task execution, it still has some defects compared to the thread-per-request model. thread-per-request is designed to associate a thread with a request, making the code easier to understand, debug and maintain, which would be complicated by thread sharing. Virtual Threads aims to achieve this, bringing clearer code execution and a more easily understood thread structure.
Virtual threads run as daemon threads and are scheduled by normal threads. I won’t go into detail here, but you can read the official JEP from OPENJDK.
Thinking About Massive Throughput? Meet Virtual Threads!
Golang Goroutine #
Compared to os, which allocates 2M of memory at a fixed time, the goroutine stack uses dynamic expansion. Initially, it carries a carry of 2KB, and grows as needed as tasks are executed, up to a maximum of 256M on a 32-bit machine and 1G on a 64-bit machine. In addition, GO periodically recycles memory that is no longer in use to shrink stack space.
Below, we will talk specifically about GPM, a scheduling model for executing Goroutine.
✨ When organizing this part, I found that there are many excellent articles that explain it very clearly. In this article, I will not expand on the source code in particular detail. Here are two recommended articles
深入分析Go1.18 GMP调度器底层原理-腾讯云开发者社区-腾讯云
GPM #
There are three important components in the GPM runtime scheduler
- G: goroutine, which represents a task to be executed, which can be compared to runnable
- M: indicates an operating system thread, i.e. an OS Thread, which is scheduled and managed by the operating system’s scheduler
- P: the processor, which indicates the local scheduler running on the thread
The specific data structure definition is in the source code under the go/src/runtime/runtime2.go path. Only some of the parameters are listed below:
var(
allm *m // m
gomaxprocs int32 // p Maximum number
ncpu int32 // cpu Number
forcegc forcegcstate // Force GC
sched schedt // Global scheduler
allp []*p // p
allpLock mutex // Lock used when reading and writing allp
idlepMask pMask // idle p, each p corresponds to pMask
timerpMask pMask //
)
type g struct {
stack stack // goroutine stack memory
_panic *_panic // goroutine internal panic
_defer *_defer // goroutine internal defer
m *m // The m currently used by g
sched gobuf // goroutine scheduling data
param unsafe.Pointer // param is a generic pointer parameter field for passing
atomicstatus atomic.Uint32 // goroutine status
goid uint64 // goroutine ID
waitsince int64 // goroutine blocking time
preempt bool // preemptive signal
...
}
type m struct {
g0 *g // goroutine
morebuf gobuf // gobuf arg to morestack
tls [tlsSlots]uintptr // thread-local storage
curg *g // The g being executed
p puintptr // p for executing go code
nextp puintptr
oldp puintptr // p before executing syscall
mstartfn func() // m start function
id int64
// signal preemption
gsignal *g // signal-handling g
goSigStack gsignalStack // Go-allocated signal handling stack
sigmask sigset // storage for saved signal mask
signalPending atomic.Uint32 // Whether this is a pending preemption signal on this M.
preemptGen atomic.Uint32 // counts the number of completed preemption signals
......
}
type p struct {
id int32
status uint32
m muintptr // back-link to associated m (nil if idle)
mcache *mcache // p's m cache
pcache pageCache
goidcache uint64 // goroutine ids cache
// goroutines queue
runqhead uint32
runqtail uint32
runq [256]guintptr
gcw gcWork // GC execution buffer
wbBuf wbBuf // GC write buffer, may be placed in g in the future
sysmontick sysmontick // Record the signal last sysmon preempted
...
}
type schedt struct {
goidgen atomic.Uint64 // Global goroutine id generation
lock mutex // Global scheduler lock
midle muintptr // Idle m waiting for work
maxmcount int32 // Maximum number of m allowed
pidle puintptr // Idle p
// global runnable queue
runq gQueue
runqsize int32
// dead G global cache to avoid reallocating memory every time a goroutine is created
gFree struct {
lock mutex
stack gList // Gs with stacks
noStack gList // Gs without stacks
n int32
}
// sudog cache, sudog stands for goroutine in the waiting list
sudoglock mutex
sudogcache *sudog
freem *m // list of m waiting to be released
idleTime atomic.Int64 // idle time of p
// sysmon is used for cooperative preemption
sysmonwait atomic.Bool
sysmonnote note
sysmonlock mutex
...
}
Combined with the figure below, we can see that p and m are bound one-to-one. m is bound to the operating system thread, and the executable g is scheduled to run on p’s local queue, runq
. A schedt is maintained in the global queue to ensure that g is scheduled to p’s queue.
In layman’s terms, schedt is like a global pool responsible for the queue where g (task) is bound to p (task thread). p (task thread) will bind a m (os thread) for actual execution on the operating system side.
The core ideas behind this GPM model are:
- Reuse threads M as much as possible: avoid frequent creation, destruction and switching of operating system threads
- Multicore parallelism: obtain the number of CPU cores, and use this to limit the number of M and P. The corresponding global variable is
gomaxprocs
- Work Stealing: M prioritizes the execution of P’s local queue, and if the queue in P is idle, it will steal G from other M-bound P to execute. The source code is in go/src/runtime/proc.go, and the method call path is
findRunnable() → stealWork() → runqsteal()
- Hand Off: When M is blocked, the running queue of P on M will be handed over to other M for execution, which is implemented by the
handoffp()
method - Preemptive scheduling
- Cooperative timeout-based preemption: to prevent starvation caused by the new G never being able to obtain M for execution, every G running for 10ms must give up M and let other Gs execute. The source code is in go/src/runtime/proc.go, and the method call path is
sysmon() → retake() → preemptone()
- Preemption based on signals: Some tasks, such as rotational computing and GC, cannot be scanned by sysmon and therefore cannot be preempted successfully. Therefore, there is preemptive signaling. The principle is to register the SIGURG signal and handle the doSigPreempt method. The source code is in [go/src/runtime/signal_unix.go](https://github.com/golang/go/blob/master/src/runtime/signal _unix.go), the method call path is
sigtrampgo() → sighandler() → doSigPreempt()
, anddoSigPreempt
will change thepreemptGen
andsignalPending
associated with m in g
- Cooperative timeout-based preemption: to prevent starvation caused by the new G never being able to obtain M for execution, every G running for 10ms must give up M and let other Gs execute. The source code is in go/src/runtime/proc.go, and the method call path is
The source code implementation of these is explained in detail below
Exit/panic scheduling #
Under normal circumstances, if a goroutine syscall exits or panics during execution, it will be scheduled using the Gosched()
method. The source code is in [go/src/runtime/proc.go](https://github.com/golang/go/blob/master/src/r Runtime/proc.go), the method call path is Gosched() → gosched_m() → goschedImpl()
, and the goschedImpl
function will remove the association between g and m and put g back in the global scheduler’s queue runq
.
// src/runtime/proc.go
// Scheduling after exiting the syscall
func exitsyscall() {
...
if sched.disable.user && !schedEnabled(gp) {
// Initiate scheduling
Gosched()
}
...
}
// The main goroutine monitors the goroutine that triggers the panic for defer
func main() {
...
if runningPanicDefers.Load() != 0 {
// Running deferred functions should not take long.
for c := 0; c < 1000; c++ {
if runningPanicDefers.Load() == 0 {
break
}
// Initiate scheduling
Gosched()
}
}
...
}
// Start scheduling
func Gosched() {
checkTimeouts()
mcall(gosched_m)
}
func gosched_m(gp *g) {
if trace.enabled {
traceGoSched()
}
goschedImpl(gp)
}
func goschedImpl(gp *g) {
status := readgstatus(gp)
if status&^_Gscan != _Grunning {
dumpgstatus(gp)
throw("bad g status")
}
casgstatus(gp, _Grunning, _Grunnable)
// Remove the association between g and m
dropg()
lock(&sched.lock)
// Put g back on the global queue runq
globrunqput(gp)
unlock(&sched.lock)
schedule()
}
Cooperative preemptive scheduling – time-out preemption #
goalng uses a main goroutine to monitor the running of goroutines in the entire GPM. Time-out preemption is an important function in monitoring. Timeout preemption scheduling mainly monitors goroutines that have been running for more than 10ms, and directly preemptively schedules these goroutines. The method call chain is main() → sysom() → retake() → preemptone()
. The specific source code is in go/src/runtime/proc.go.
// src/runtime/proc.go
// The main goroutine.
func main() {
...
if GOARCH != "wasm" { // no threads on wasm yet, so no sysmon
systemstack(func() {
newm(sysmon, nil, -1)
})
}
...
}
func sysmon() {
...
// retake P's blocked in syscalls
// and preempt long running G's
if retake(now) != 0 {
idle = 0
} else {
idle++
}
...
}
const forcePreemptNS = 10 * 1000 * 1000 // 10ms
func retake(now int64) uint32 {
...
for i := 0; i < len(allp); i++ {
pp := allp[i]
if pp == nil {
continue
}
pd := &pp.sysmontick
s := pp.status
sysretake := false
if s == _Prunning || s == _Psyscall {
// Preempt G if it's running for too long.
t := int64(pp.schedtick)
if int64(pd.schedtick) != t {
pd.schedtick = uint32(t)
pd.schedwhen = now
} else if pd.schedwhen+forcePreemptNS <= now { // If it's more than 10ms, preempt
// preempt scheduling
preemptone(pp)
// In case of syscall, preemptone() doesn't
// work, because there is no M wired to P.
sysretake = true
}
}
}
...
}
Non-cooperative preempt scheduling – signal preemption #
The cooperative timeout-based preemption is flawed (runtime: non-cooperative goroutine preemption #24543), and in some scenarios such as for loops and garbage collection, it is impossible to detect threads that have been occupied for a long time. For this reason, signal preemption scheduling was added in Go 1.14.
To implement signal preemption, preemptGen
and signalPending
are added to M to determine the conditions for signal preemption, and pendingPreemptSignals
is defined globally. The specific source code is in go/src/runtime/signal_unix.go and go/src/runtime/preempt.go, the method call chain is divided into two parts:
- one part is
sighandler() → doSigPreempt()
, which is used to trigger the modification of the preemption signal - the other part is
suspendG() → preemptM()
, which is used to monitor the preemption signal and perform the actual preemption
// src/runtime/signal_unix.go
// Trigger signal events and modify preempt signals
func sighandler(sig uint32, info *siginfo, ctxt unsafe.Pointer, gp *g) {
...
if sig == sigPreempt && debug.asyncpreemptoff == 0 && !delayedSignal {
// Might be a preemption signal.
doSigPreempt(gp, c)
}
...
}
func doSigPreempt(gp *g, ctxt *sigctxt) {
// Check if this G wants to be preempted and is safe to
// preempt.
if wantAsyncPreempt(gp) {
if ok, newpc := isAsyncSafePoint(gp, ctxt.sigpc(), ctxt.sigsp(), ctxt.siglr()); ok {
// Adjust the PC and inject a call to asyncPreempt.
ctxt.pushCall(abi.FuncPCABI0(asyncPreempt), newpc)
}
}
// Modify the preemption signal associated with m in g
// Acknowledge the preemption.
gp.m.preemptGen.Add(1)
gp.m.signalPending.Store(0)
if GOOS == "darwin" || GOOS == "ios" {
pendingPreemptSignals.Add(-1)
}
}
// src/runtime/preempt.go
// Monitor preempt signal status and perform preempt actions
func suspendG(gp *g) suspendGState {
......
for i := 0; ; i++ {
switch s := readgstatus(gp); s {
case _Grunning:
// Prepare for asynchronous preemption.
asyncM2 := gp.m
// Get preemptGen in m
asyncGen2 := asyncM2.preemptGen.Load()
needAsync := asyncM != asyncM2 || asyncGen != asyncGen2
asyncM = asyncM2
asyncGen = asyncGen2
// cas to check g status
casfrom_Gscanstatus(gp, _Gscanrunning, _Grunning)
if preemptMSupported && debug.asyncpreemptoff == 0 && needAsync {
now := nanotime()
if now >= nextPreemptM {
nextPreemptM = now + yieldDelay/2
// preempt M
preemptM(asyncM)
}
}
}
...
}
...
}
func preemptM(mp *m) {
if GOOS == "darwin" || GOOS == "ios" {
execLock.rlock()
}
if mp.signalPending.CompareAndSwap(0, 1) {
if GOOS == "darwin" || GOOS == "ios" {
pendingPreemptSignals.Add(1)
}
signalM(mp, sigPreempt)
}
if GOOS == "darwin" || GOOS == "ios" {
execLock.runlock()
}
}
Work stealing #
Work stealing is a core part of the Fork-Join model. We can see that the ForkJoinThread in Java also has this feature. Similarly, the implementation of work stealing in Go is similar to that of the ForkJoinPool in Java, which is to findRunnable
in the global scheduler to find tasks execute, and try to steal it from an idle p. Let’s take a look at the source code, specifically in go/src/runtime/proc.go.
func schedule() {
...
gp, inheritTime, tryWakeP := findRunnable() // blocks until work is available
...
}
func findRunnable() (gp *g, inheritTime, tryWakeP bool) {
...
// Spinning Ms: steal work from other Ps.
if mp.spinning || 2*sched.nmspinning.Load() < gomaxprocs-sched.npidle.Load() {
if !mp.spinning {
mp.becomeSpinning()
}
// Try to steal the goroutine or timer waiting to be executed from any of the p
gp, inheritTime, tnow, w, newWork := stealWork(now)
if gp != nil {
// Successfully stole.
return gp, inheritTime, false
}
...
}
}
Go: Work-Stealing in Go Scheduler
Hand off #
Handoff mainly involves taking over a syscall or a locked and blocked m and handing its corresponding g to another p to execute. This is detailed in the handoffp()
method of go/src/runtime/proc.go. There are the following paths to handoffp()
:
- m exit call:
mexit() → handoffp()
- global monitor call:
sysmon() → retake() → handoffp()
- call when entering the syscall block:
entersyscallblock() → entersyscallblock_handoff() → handoffp()
- Called when exiting the syscall:
exitsyscall() → exitsyscall0() → stoplockedm() → handoffp()
- Called for each p when performing gc:
forEachP() → handoffp()
Go keyword compilation #
In Go, a goroutine can be created simply by declaring the go keyword. The actual implementation is that at compile time, the compiler converts the go keyword into a call to the newproc()
function. The specific compilation conversion is in go/src/cmd/compile/internal/ssagen/ssa.go.
// src/cmd/compile/internal/ssagen/ssa.go
func (s *state) stmt(n ir.Node) {
...
s.stmtList(n.Init())
switch n.Op() {
case ir.OGO:
n := n.(*ir.GoDeferStmt)
s.callResult(n.Call.(*ir.CallExpr), callGo)
}
}
func (s *state) call(n *ir.CallExpr, k callKind, returnResultAddr bool) *ssa.Value {
...
switch {
case k == callGo:
aux := ssa.StaticAuxCall(ir.Syms.Newproc, s.f.ABIDefault.ABIAnalyzeTypes(nil, ACArgs, ACResults))
call = s.newValue0A(ssa.OpStaticLECall, aux.LateExpansionResultType(), aux)
}
}
The newproc()
method creates a goroutine with the _Grunnable
state
// Create a new g running fn.
// Put it on the queue of g's waiting to run.
// The compiler turns a go statement into a call to this.
func newproc(fn *funcval) {
gp := getg()
pc := getcallerpc()
systemstack(func() {
newg := newproc1(fn, gp, pc)
pp := getg().m.p.ptr()
runqput(pp, newg, true)
if mainStarted {
wakep()
}
})
}
Summary #
Java is designed with the idea of first implementing a one-to-one correspondence based on the underlying thread model, and then encapsulating concurrent tools on top of that foundation, such as the various thread pools provided in Executors
. In general, the 2 thread pools are built based on the ideas of pooling and ForkJoin. The downside is that developers need to understand it before they can use it more elegantly, which makes programming more difficult.
In contrast, Go provides GPM, which is also built based on the Fork-Join model, but adds monitoring and preemptive scheduling on top of it, and encapsulates this relatively complex design in the Go SDK. From the user’s perspective, it can be quickly implemented by simply using the go
keyword. With the multi-threading guarantee provided by GPM, developers can achieve high concurrency and high throughput without having to specifically understand its design ideas, which reduces the difficulty of programming and greatly simplifies the writing style.
Other #
memory model will not be organized for the time being. It will be organized at a later time. For related documents, please refer to these