Skip to main content
  1. Articels/

Java & Go thread model comparison

·5397 words·11 mins
Java language Go language thread management source code analysis
Weaxs
Author
Weaxs
Table of Contents

Introduction
#

The CPU resource allocation object in Java is a Thread, and the CPU resource allocation object in Go is a goroutine. Java Threads correspond one-to-one with operating system threads; goroutines are lightweight user threads implemented in Go and managed through GPM, which have an n:m relationship with the operating system.

This article aims to analyze source code to study the specific implementation and design ideas of the thread model in Java and Go.

Java Thread
#

Hotspot JavaThread
#

In the Hotspot VM, the Java java.lang.Thread and the operating system thread are in a 1:1 relationship. The Java thread will create the corresponding operating system thread when it starts, and the operating system thread will be reclaimed when the Java thread terminates.

The thread-related code in Hotspot is in jdk/src/hotspot/share/runtime. JavaThread in Java 11 is defined in thread.hpp and thread.cpp. The source code can be found in jdk-11+28 hotspot runtime

// Class hierarchy
// - Thread
//   - NamedThread
//     - VMThread
//     - ConcurrentGCThread
//     - WorkerThread
//       - GangWorker
//       - GCTaskThread
//   - JavaThread
//     - various subclasses eg CompilerThread, ServiceThread
//   - WatcherThread

// Thread
class Thread: public ThreadShadow {
  friend class VMStructs;
  friend class JVMCIVMStructs;
 private:
    GCThreadLocalData _gc_data; // ThreadLocal data in GC
 protected:
    void*       _real_malloc_address; // Real allocation address
 protected:
  OSThread* _osthread; // The os Thread associated with the Thread
}

// JavaThread
class JavaThread: public Thread {
  friend class VMStructs;
  friend class JVMCIVMStructs;
  friend class WhiteBox;
 private:
  JavaThread* _next; // The next thread in the thread list
  bool _on_thread_list; // Whether the current JavaThread is in the thread list
  oop _threadObj; // Java-level thread object
}

JavaThread::JavaThread(bool is_attaching_via_jni) :
                       Thread() {
  initialize();
  if (is_attaching_via_jni) {
    _jni_attach_state = _attaching_via_jni;
  } else {
    _jni_attach_state = _not_attaching_via_jni;
  }
  assert(deferred_card_mark().is_empty(), "Default MemRegion ctor");
}

In Hotspot for Java 11, there will be a parent class Thread, and _osThread is defined in Thread to associate with the operating system thread. The following subclass implementations are available for different parent classes ( HotSpot Runtime Overview #Thread Management ):

  • NamedThread: a thread with a name that represents a uniquely named instance, defined by _name to indicate the thread name
    • VMThread: a thread responsible for executing VM operations such as scavenge, garbage_collect, etc.
    • ConcurrentGCThread: a thread responsible for concurrently executing garbage collection
    • WorkerThread: refers to a worker thread with an assigned work ID, with _id as the specific work ID
      • GangWorker: literally a gang worker thread
      • GCTaskThread: the worker thread responsible for GC tasks
  • JavaThread: corresponds to java.lang.Thread
    • CompilerThread: the thread responsible for converting compiled bytecode to machine code (native thread) during runtime
    • ServiceThread: used for low memory detection and JVMTI (native interface provided by the virtual machine)
  • WatcherThread: responsible for simulating timer interrupts
public:
  // Constructor
  JavaThread();                            // delegating constructor
  JavaThread(bool is_attaching_via_jni);   // for main thread and JNI attached threads
  JavaThread(ThreadFunction entry_point, size_t stack_size = 0);
  ~JavaThread();

JavaThread::JavaThread(bool is_attaching_via_jni) :
                       Thread() {
  initialize();
  if (is_attaching_via_jni) {
    _jni_attach_state = _attaching_via_jni;
  } else {
    _jni_attach_state = _not_attaching_via_jni;
  }
  assert(deferred_card_mark().is_empty(), "Default MemRegion ctor");
}

JavaThread::JavaThread(ThreadFunction entry_point, size_t stack_sz) :
                       Thread() {
  initialize();
  _jni_attach_state = _not_attaching_via_jni;
  set_entry_point(entry_point);
  // Create the native thread itself.
  // %note runtime_23
  os::ThreadType thr_type = os::java_thread;
  thr_type = entry_point == &compiler_thread_entry ? os::compiler_thread :
                                                     os::java_thread;
  os::create_thread(this, thr_type, stack_sz);
}

Looking specifically at the JavaThread constructor, we can see that it has two constructors, of which JavaThread(bool is_attaching_via_jni) is only used for connecting the main thread to JNI. Therefore, the constructor we use for user threads is JavaThread(ThreadFunction entry_point, size_t stack_size = 0), where _osthread is created and bound, which also determines that in the Hotspot virtual machine, Java Threads and operating system threads are one-to-one corresponding.

Thread JNI
#

The JavaThread is defined in the Hotstop virtual machine in C++, corresponding to the Thread class defined in the JDK. Java calls C++ using JNI (Java Native Interface). So how does JNI connect these two pieces of code?

Let’s take a look at the source code below to see the process of creating a JavaThread in the VM, from creating a Thread object in Java

public class Thread implements Runnable {

	public synchronized void start() {
		if (threadStatus != 0)
            throw new IllegalThreadStateException();

        group.add(this);

        boolean started = false;
        try {
		    // native 方法 创建 
            start0();
            started = true;
        } finally {
            try {
                if (!started) {
                    group.threadStartFailed(this);
                }
            } catch (Throwable ignore) {
            }
        }
	}
		
	private native void start0();
}

As you can see in this part, in the Thread.start() method, the native method start0() is called to create an operating system thread. In other words, just new Thread() only creates a thread object in the Java program, but does not have the corresponding system resources. Only after the start method is called will the corresponding resources be created in the virtual machine. The JDK defines the function in the VM corresponding to the native method in Java in jdk/src/java.base/share/native/libjava/Thread.c.

It should be noted that the JNI definition is implemented in c in the JDK, but not in the source code of the virtual machine. This means that if you are not using the Hotspot virtual machine, you only need to implement the JVM_StartThread method in a new virtual machine based on openjdk.

static JNINativeMethod methods[] = {
    {"start0",           "()V",        (void *)&JVM_StartThread},
    {"stop0",            "(" OBJ ")V", (void *)&JVM_StopThread},
    {"isAlive",          "()Z",        (void *)&JVM_IsThreadAlive},
    {"suspend0",         "()V",        (void *)&JVM_SuspendThread},
    {"resume0",          "()V",        (void *)&JVM_ResumeThread},
    {"setPriority0",     "(I)V",       (void *)&JVM_SetThreadPriority},
    {"yield",            "()V",        (void *)&JVM_Yield},
    {"sleep",            "(J)V",       (void *)&JVM_Sleep},
    {"currentThread",    "()" THD,     (void *)&JVM_CurrentThread},
    {"countStackFrames", "()I",        (void *)&JVM_CountStackFrames},
    {"interrupt0",       "()V",        (void *)&JVM_Interrupt},
    {"isInterrupted",    "(Z)Z",       (void *)&JVM_IsInterrupted},
    {"holdsLock",        "(" OBJ ")Z", (void *)&JVM_HoldsLock},
    {"getThreads",        "()[" THD,   (void *)&JVM_GetAllThreads},
    {"dumpThreads",      "([" THD ")[[" STE, (void *)&JVM_DumpThreads},
    {"setNativeName",    "(" STR ")V", (void *)&JVM_SetNativeThreadName},
};

So how is JVM_StartThread implemented in the Hotspot virtual machine? Here we need to look at jdk/src/hotspot/share/prims/jvm.cpp

JVM_ENTRY(void, JVM_StartThread(JNIEnv* env, jobject jthread))
  JVMWrapper("JVM_StartThread");
  JavaThread *native_thread = NULL;
  bool throw_illegal_thread_state = false;
  // We must release the Threads_lock before we can post a jvmti event
  // in Thread::start.
  {
    MutexLocker mu(Threads_lock);

    if (java_lang_Thread::thread(JNIHandles::resolve_non_null(jthread)) != NULL) {
      throw_illegal_thread_state = true;
    } else {
      jlong size =
             java_lang_Thread::stackSize(JNIHandles::resolve_non_null(jthread));
      NOT_LP64(if (size > SIZE_MAX) size = SIZE_MAX;)
      size_t sz = size > 0 ? (size_t) size : 0;

          // Create the JavaThread in Hotspot and also create the corresponding _osthread
      native_thread = new JavaThread(&thread_entry, sz);
      if (native_thread->osthread() != NULL) {
        // Note: the current thread is not being used within "prepare".
        native_thread->prepare(jthread);
      }
    }
  }

  if (throw_illegal_thread_state) {
    THROW(vmSymbols::java_lang_IllegalThreadStateException());
  }

  assert(native_thread != NULL, "Starting null thread?");

  if (native_thread->osthread() == NULL) {
    native_thread->smr_delete();
    if (JvmtiExport::should_post_resource_exhausted()) {
      JvmtiExport::post_resource_exhausted(
        JVMTI_RESOURCE_EXHAUSTED_OOM_ERROR | JVMTI_RESOURCE_EXHAUSTED_THREADS,
        os::native_thread_creation_failed_msg());
    }
    THROW_MSG(vmSymbols::java_lang_OutOfMemoryError(),
              os::native_thread_creation_failed_msg());
  }

    // Start execution. Execute os::start_thread
  Thread::start(native_thread);

JVM_END

Let’s summarize the chain above

Thread::start() →

native start0() → JNI JNINativeMethod →

JVM_StartThread → Hotstop JavaThread → osthread

Thread state JMVTI Thread State
#

The Thread.class in the JDK and JavaThread in the c++ code correspond one-to-one. The specific execution and flow of the thread are also handled by the virtual machine. Therefore, the thread state (thread status) also needs a corresponding place in the Hotstop virtual machine, which is JMVTI Thread State. The corresponding source code is jvmtiThreadState.hpp

  • NEW → JVMTI_THREAD_STATE_ALIVE : 0x0001
  • RUNNABLE → JVMTI_THREAD_STATE_RUNNABLE : 0x0004
  • BLOCKED → JVMTI_THREAD_STATE_BLOCKED_ON_MONITOR_ENTER : 0x0400
  • WAITING → JVMTI_THREAD_STATE_WAITING_INDEFINITELY : 0x0010
  • TIMED_WAITING → JVMTI_THREAD_STATE_WAITING_WITH_TIMEOUT : 0x0020
  • TERMINATED → JVMTI_THREAD_STATE_TERMINATED : 0x0002
public class Thread implements Runnable {
	/*
     * Java thread status for tools, default indicates thread 'not yet started'
     */
    private volatile int threadStatus;

		public enum State {
				// Thread state for a thread which has not yet started.
        NEW,

        // Thread state for a runnable thread.
        RUNNABLE,

        // Thread state for a thread blocked waiting for a monitor lock.
        BLOCKED,

        // Thread state for a waiting thread.
        WAITING,

        // Thread state for a waiting thread with a specified waiting time.
        TIMED_WAITING,

        // The thread has completed execution.
        TERMINATED;
	}
}

public class VM {
	/* The threadStatus field is set by the VM at state transition
     * in the hotspot implementation. Its value is set according to
     * the JVM TI specification GetThreadState function.
     */
    private static final int JVMTI_THREAD_STATE_ALIVE = 0x0001;
    private static final int JVMTI_THREAD_STATE_TERMINATED = 0x0002;
    private static final int JVMTI_THREAD_STATE_RUNNABLE = 0x0004;
    private static final int JVMTI_THREAD_STATE_BLOCKED_ON_MONITOR_ENTER = 0x0400;
    private static final int JVMTI_THREAD_STATE_WAITING_INDEFINITELY = 0x0010;
    private static final int JVMTI_THREAD_STATE_WAITING_WITH_TIMEOUT = 0x0020;

	public static Thread.State toThreadState(int threadStatus) {
        if ((threadStatus & JVMTI_THREAD_STATE_RUNNABLE) != 0) {
            return RUNNABLE;
        } else if ((threadStatus & JVMTI_THREAD_STATE_BLOCKED_ON_MONITOR_ENTER) != 0) {
            return BLOCKED;
        } else if ((threadStatus & JVMTI_THREAD_STATE_WAITING_INDEFINITELY) != 0) {
            return WAITING;
        } else if ((threadStatus & JVMTI_THREAD_STATE_WAITING_WITH_TIMEOUT) != 0) {
            return TIMED_WAITING;
        } else if ((threadStatus & JVMTI_THREAD_STATE_TERMINATED) != 0) {
            return TERMINATED;
        } else if ((threadStatus & JVMTI_THREAD_STATE_ALIVE) == 0) {
            return NEW;
        } else {
            return RUNNABLE;
        }
    }
}

JavaThread.png

Thread pool
#

ThreadPoolExecutor
#

The official Java library provides the thread pool ThreadPoolExecutor in the JDK for thread reuse and thread management. There have been many excellent articles on ThreadPoolExecutor, so here we will just highlight a few of the more important parameters and the execution process of the thread pool

There are the following three important parameters when creating a ThreadPoolExecutor thread pool:

  • corePoolSize: the number of core threads in the thread pool, which refers to the minimum number of threads that must be kept active
  • maximumPoolSize: the maximum number of threads that the thread pool is allowed to create. When the thread pool’s task queue is full, the maximum number of threads that can be created
  • workQueue: the queue where the thread pool stores tasks, used to store all pending tasks of the thread pool

There are also some internal parameters that are crucial:

  • AtomicInteger ctl: the main control parameter, which records the state of the thread pool and the number of worker threads. ctl is a 32-bit binary (including the sign bit), and after the sign bit is removed, there are 31 bits. The last 29 bits are used in the thread pool to indicate the number of worker threads, which can count up to 500 million; the first 3 bits (including the sign bit) are used to indicate the state of the thread pool. Modifications to ctl are guaranteed to be thread-consistent via CAS + volatile.
  • HashSet workers: the running worker threads, which must be accessed after obtaining mainLock. The number of workers is the same as the number stored in ctl
  • ReentrantLock mainLock: the lock for the thread pool. When accessing the workers in the thread pool, this lock must be obtained first. Note that this is a non-fair lock

The execution process of the thread pool is as follows:

  1. If the current number of worker threads is less than the number of core thread pools corePoolSize, then attempt to create a worker thread and execute the task. If successful, return; otherwise go to 2
  2. Attempt to add the task to the waiting queue workQueue. If the addition is successful, check again (there may be deadlocks after the check from step 1). If the second check finds that there is no running thread, then create a worker thread and execute; otherwise go to 3
  3. If the queue failed to be added, try to create a worker thread directly. If the number of currently running threads does not exceed maximumPoolSize, create a worker thread and execute it; otherwise, execute the rejection policy handler
public class ThreadPoolExecutor extends AbstractExecutorService {
       private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
       private final ReentrantLock mainLock = new ReentrantLock();
       private final HashSet<Worker> workers = new HashSet<>();

       private volatile int corePoolSize;
       private volatile int maximumPoolSize;
       private final BlockingQueue<Runnable> workQueue;
       private volatile long keepAliveTime;
       private volatile ThreadFactory threadFactory;
       private volatile RejectedExecutionHandler handler;

       private static final int COUNT_BITS = Integer.SIZE - 3;      // 29
       // 11111...111   2 ^ 29 - 1
       private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;
    
       // The state of the thread pool itself
       private static final int RUNNING    = -1 << COUNT_BITS;    // 101
       private static final int SHUTDOWN   =  0 << COUNT_BITS;    // 000
       private static final int STOP       =  1 << COUNT_BITS;    // 001
       private static final int TIDYING    =  2 << COUNT_BITS;    // 010
       private static final int TERMINATED =  3 << COUNT_BITS;    // 011

       // Remove the first 3 bits of the state to get the number of worker threads
       private static int workerCountOf(int c)  { return c & COUNT_MASK; }
       // rs-state (RUNNING/SHUTDOWN/STOP/TIDYING/TERMINATED)
       // If wc = 0, this method returns the thread pool state; if w = workerCountOf(c), this method returns the real value of ctl
       private static int ctlOf(int rs, int wc) { return rs | wc; }

       public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }
       
}

ThreadPoolExecutor.png

For more details, please refer to these two articles

JUC线程池: ThreadPoolExecutor详解

Java线程池实现原理及其在美团业务中的实践

ForkJoinPool
#

ForkJoinPool is another thread pool provided by the official library after JDK 1.7. It is a thread pool based on the Fork/Join framework and is provided in versions after JDK 1.7. ForkJoinPool mainly uses two ideas ( A Java Fork/Join Framework):

  • Divide-and-Conquer (Divide-and-Conquer): Recursively divide a task into subtasks. For example, divide a task of size N into K smaller tasks of size smaller, which are independent of each other and have the same nature as the original problem. Wait for the subtasks to complete, collect their results and merge them (Join).
  • Work-stealing algorithm (Work-Stealing): When a worker thread has no local tasks to run, it attempts to “steal” a task from another worker thread. For example, one worker thread uses the first-in, first-out method to execute subtasks, and another idle worker thread “steals” tasks using the first-in, first-out method.

Combined with ForkJoinPool, it can be roughly understood as

  1. The Runnable or Callable task added by the user to the ForkJoinPool does not necessarily create a corresponding ForkJoinWorkerThread (one-to-one correspondence with Thread), but is used as a ForkJoinTask subtask, waiting to be executed by an idle or newly created ForkJoinWorkerThread.
  2. ForkJoinPool and ForkJoinWorkerThread each maintain a workQueue
    • The workQueue in the ForkJoinPool stores the ForkJoinWorkerThread on the odd-numbered bits and the ForkJoinTask[] array on the even-numbered bits
    • The workQueue in the ForkJoinWorkerThread stores the ForkJoinTask[] array, which mainly stores the subtasks that need to be executed by the current worker thread
  3. During execution, ForkJoinWorkerThread will call the runWork() method provided by ForkJoinPool, and continuously scan the workQueue via scan(). If there are idle threads or too few worker threads, it will attempt to steal and hand over ForkJoinTask to other Threads for execution

Based on this, the flowchart of ForkJoinPool execution is as follows:

ForkJoinPool.png

As for other parameters and specific methods, etc., it is recommended to read the following article, which explains in great detail

JUC线程池: Fork/Join框架详解

Virtual Threads
#

OPENJDK19 first introduced the concept of Virtual Threads. According to the description, a virtual thread is a lightweight thread. At the Hotspot VM level, its implementation is still based on JavaThread, except that an object pointer _vthread is added to represent Virtual Threads. ( JEP 425: Virtual Threads (Preview), JEP 436: Virtual Threads (Second Preview), JEP 444: Virtual Threads )

So Java already has these thread pools, why do we need Virtual Threads?

Although the ForkJoinPool based on thread sharing has improved the throughput of task execution, it still has some defects compared to the thread-per-request model. thread-per-request is designed to associate a thread with a request, making the code easier to understand, debug and maintain, which would be complicated by thread sharing. Virtual Threads aims to achieve this, bringing clearer code execution and a more easily understood thread structure.

Virtual threads run as daemon threads and are scheduled by normal threads. I won’t go into detail here, but you can read the official JEP from OPENJDK.

Thinking About Massive Throughput? Meet Virtual Threads!

Golang Goroutine
#

Compared to os, which allocates 2M of memory at a fixed time, the goroutine stack uses dynamic expansion. Initially, it carries a carry of 2KB, and grows as needed as tasks are executed, up to a maximum of 256M on a 32-bit machine and 1G on a 64-bit machine. In addition, GO periodically recycles memory that is no longer in use to shrink stack space.

Below, we will talk specifically about GPM, a scheduling model for executing Goroutine.

✨ When organizing this part, I found that there are many excellent articles that explain it very clearly. In this article, I will not expand on the source code in particular detail. Here are two recommended articles

深入分析Go1.18 GMP调度器底层原理-腾讯云开发者社区-腾讯云

Go 语言调度器与 Goroutine 实现原理

GPM
#

There are three important components in the GPM runtime scheduler

  • G: goroutine, which represents a task to be executed, which can be compared to runnable
  • M: indicates an operating system thread, i.e. an OS Thread, which is scheduled and managed by the operating system’s scheduler
  • P: the processor, which indicates the local scheduler running on the thread

The specific data structure definition is in the source code under the go/src/runtime/runtime2.go path. Only some of the parameters are listed below:

var(
    allm       *m            // m
    gomaxprocs int32         // p Maximum number
    ncpu       int32         // cpu Number
    forcegc    forcegcstate  // Force GC
    sched      schedt        // Global scheduler
    allp []*p                // p
    allpLock mutex           // Lock used when reading and writing allp
    idlepMask pMask          // idle p, each p corresponds to pMask
    timerpMask pMask         //
)

type g struct {
    stack       stack   // goroutine stack memory
    _panic    *_panic // goroutine internal panic
    _defer    *_defer // goroutine internal defer
    m         *m      // The m currently used by g
    sched     gobuf   // goroutine scheduling data
    param        unsafe.Pointer  // param is a generic pointer parameter field for passing
    atomicstatus atomic.Uint32 // goroutine status
    goid         uint64        // goroutine ID
    waitsince    int64      // goroutine blocking time
    preempt       bool // preemptive signal
    ...
}

type m struct {
    g0      *g     // goroutine
    morebuf gobuf  // gobuf arg to morestack
    tls           [tlsSlots]uintptr // thread-local storage
    curg          *g // The g being executed
    p             puintptr // p for executing go code
    nextp         puintptr
    oldp          puintptr // p before executing syscall
    mstartfn      func()   // m start function
    id            int64

    // signal preemption
    gsignal *g                // signal-handling g
    goSigStack gsignalStack      // Go-allocated signal handling stack
    sigmask sigset            // storage for saved signal mask
    signalPending atomic.Uint32 // Whether this is a pending preemption signal on this M.
    preemptGen atomic.Uint32   // counts the number of completed preemption signals
    ......
}

type p struct {
    id          int32
    status      uint32
    m           muintptr   // back-link to associated m (nil if idle)
    mcache      *mcache    // p's m cache
    pcache      pageCache
    goidcache    uint64 // goroutine ids cache
    // goroutines queue
    runqhead uint32
    runqtail uint32
    runq     [256]guintptr
    gcw gcWork  // GC execution buffer
    wbBuf wbBuf // GC write buffer, may be placed in g in the future
    sysmontick  sysmontick // Record the signal last sysmon preempted
    ...
}

type schedt struct {
    goidgen   atomic.Uint64 // Global goroutine id generation
    lock mutex // Global scheduler lock
    midle        muintptr // Idle m waiting for work
    maxmcount    int32    // Maximum number of m allowed

    pidle        puintptr // Idle p

    // global runnable queue
    runq     gQueue
    runqsize int32

    // dead G global cache to avoid reallocating memory every time a goroutine is created
    gFree struct {
       lock    mutex
       stack   gList // Gs with stacks
       noStack gList // Gs without stacks
       n       int32
    }

    // sudog cache, sudog stands for goroutine in the waiting list
    sudoglock  mutex
    sudogcache *sudog

    freem *m // list of m waiting to be released

    idleTime atomic.Int64 // idle time of p

    // sysmon is used for cooperative preemption
    sysmonwait atomic.Bool
    sysmonnote note
    sysmonlock mutex
    ...
}

Combined with the figure below, we can see that p and m are bound one-to-one. m is bound to the operating system thread, and the executable g is scheduled to run on p’s local queue, runq. A schedt is maintained in the global queue to ensure that g is scheduled to p’s queue.

In layman’s terms, schedt is like a global pool responsible for the queue where g (task) is bound to p (task thread). p (task thread) will bind a m (os thread) for actual execution on the operating system side.

GPM.png

The core ideas behind this GPM model are:

  • Reuse threads M as much as possible: avoid frequent creation, destruction and switching of operating system threads
  • Multicore parallelism: obtain the number of CPU cores, and use this to limit the number of M and P. The corresponding global variable is gomaxprocs
  • Work Stealing: M prioritizes the execution of P’s local queue, and if the queue in P is idle, it will steal G from other M-bound P to execute. The source code is in go/src/runtime/proc.go, and the method call path is findRunnable() → stealWork() → runqsteal()
  • Hand Off: When M is blocked, the running queue of P on M will be handed over to other M for execution, which is implemented by the handoffp() method
  • Preemptive scheduling
    • Cooperative timeout-based preemption: to prevent starvation caused by the new G never being able to obtain M for execution, every G running for 10ms must give up M and let other Gs execute. The source code is in go/src/runtime/proc.go, and the method call path is sysmon() → retake() → preemptone()
    • Preemption based on signals: Some tasks, such as rotational computing and GC, cannot be scanned by sysmon and therefore cannot be preempted successfully. Therefore, there is preemptive signaling. The principle is to register the SIGURG signal and handle the doSigPreempt method. The source code is in [go/src/runtime/signal_unix.go](https://github.com/golang/go/blob/master/src/runtime/signal _unix.go), the method call path is sigtrampgo() → sighandler() → doSigPreempt(), and doSigPreempt will change the preemptGen and signalPending associated with m in g

The source code implementation of these is explained in detail below

Exit/panic scheduling
#

Under normal circumstances, if a goroutine syscall exits or panics during execution, it will be scheduled using the Gosched() method. The source code is in [go/src/runtime/proc.go](https://github.com/golang/go/blob/master/src/r Runtime/proc.go), the method call path is Gosched() → gosched_m() → goschedImpl(), and the goschedImpl function will remove the association between g and m and put g back in the global scheduler’s queue runq.

// src/runtime/proc.go

// Scheduling after exiting the syscall
func exitsyscall() {
    ...
    if sched.disable.user && !schedEnabled(gp) {
          // Initiate scheduling
          Gosched()
    }
    ...
}

// The main goroutine monitors the goroutine that triggers the panic for defer
func main() {
    ...
    if runningPanicDefers.Load() != 0 {
       // Running deferred functions should not take long.
       for c := 0; c < 1000; c++ {
          if runningPanicDefers.Load() == 0 {
             break
          }
          // Initiate scheduling
          Gosched()
       }
    }
    ...
}

// Start scheduling
func Gosched() {
    checkTimeouts()
    mcall(gosched_m)
}

func gosched_m(gp *g) {
    if trace.enabled {
       traceGoSched()
    }
    goschedImpl(gp)
}

func goschedImpl(gp *g) {
    status := readgstatus(gp)
    if status&^_Gscan != _Grunning {
       dumpgstatus(gp)
       throw("bad g status")
    }
    casgstatus(gp, _Grunning, _Grunnable)
    // Remove the association between g and m
    dropg()
    lock(&sched.lock)
    // Put g back on the global queue runq
    globrunqput(gp)
    unlock(&sched.lock)

    schedule()
}

Cooperative preemptive scheduling – time-out preemption
#

goalng uses a main goroutine to monitor the running of goroutines in the entire GPM. Time-out preemption is an important function in monitoring. Timeout preemption scheduling mainly monitors goroutines that have been running for more than 10ms, and directly preemptively schedules these goroutines. The method call chain is main() → sysom() → retake() → preemptone(). The specific source code is in go/src/runtime/proc.go.

// src/runtime/proc.go

// The main goroutine.
func main() {
    ...
    if GOARCH != "wasm" { // no threads on wasm yet, so no sysmon
        systemstack(func() {
            newm(sysmon, nil, -1)
        })
    }
    ...
}

func sysmon() {
...
// retake P's blocked in syscalls
// and preempt long running G's
    if retake(now) != 0 {
        idle = 0
    } else {
        idle++
    }
    ...
}

const forcePreemptNS = 10 * 1000 * 1000 // 10ms

func retake(now int64) uint32 {
    ...
    for i := 0; i < len(allp); i++ {
        pp := allp[i]
        if pp == nil {
            continue
        }
        pd := &pp.sysmontick
        s := pp.status
        sysretake := false
        if s == _Prunning || s == _Psyscall {
            // Preempt G if it's running for too long.
            t := int64(pp.schedtick)
            if int64(pd.schedtick) != t {
                pd.schedtick = uint32(t)
                pd.schedwhen = now
			} else if pd.schedwhen+forcePreemptNS <= now { // If it's more than 10ms, preempt
                // preempt scheduling
                preemptone(pp)
                // In case of syscall, preemptone() doesn't
                // work, because there is no M wired to P.
                sysretake = true
            }
        }
    }
    ...
}

Non-cooperative preempt scheduling – signal preemption
#

The cooperative timeout-based preemption is flawed (runtime: non-cooperative goroutine preemption #24543), and in some scenarios such as for loops and garbage collection, it is impossible to detect threads that have been occupied for a long time. For this reason, signal preemption scheduling was added in Go 1.14.

To implement signal preemption, preemptGen and signalPending are added to M to determine the conditions for signal preemption, and pendingPreemptSignals is defined globally. The specific source code is in go/src/runtime/signal_unix.go and go/src/runtime/preempt.go, the method call chain is divided into two parts:

  • one part is sighandler() → doSigPreempt(), which is used to trigger the modification of the preemption signal
  • the other part is suspendG() → preemptM(), which is used to monitor the preemption signal and perform the actual preemption
// src/runtime/signal_unix.go 

// Trigger signal events and modify preempt signals
func sighandler(sig uint32, info *siginfo, ctxt unsafe.Pointer, gp *g) {
    ...
    if sig == sigPreempt && debug.asyncpreemptoff == 0 && !delayedSignal {
       // Might be a preemption signal.
       doSigPreempt(gp, c)
    }
    ...
}

func doSigPreempt(gp *g, ctxt *sigctxt) {
    // Check if this G wants to be preempted and is safe to
    // preempt.
    if wantAsyncPreempt(gp) {
       if ok, newpc := isAsyncSafePoint(gp, ctxt.sigpc(), ctxt.sigsp(), ctxt.siglr()); ok {
          // Adjust the PC and inject a call to asyncPreempt.
          ctxt.pushCall(abi.FuncPCABI0(asyncPreempt), newpc)
       }
    }

    // Modify the preemption signal associated with m in g
    // Acknowledge the preemption.
    gp.m.preemptGen.Add(1)
    gp.m.signalPending.Store(0)

    if GOOS == "darwin" || GOOS == "ios" {
       pendingPreemptSignals.Add(-1)
    }
}

// src/runtime/preempt.go

// Monitor preempt signal status and perform preempt actions
func suspendG(gp *g) suspendGState {
    ......
    for i := 0; ; i++ {
       switch s := readgstatus(gp); s {
          case _Grunning:
                // Prepare for asynchronous preemption.
             asyncM2 := gp.m
             // Get preemptGen in m 
             asyncGen2 := asyncM2.preemptGen.Load()
             needAsync := asyncM != asyncM2 || asyncGen != asyncGen2
             asyncM = asyncM2
             asyncGen = asyncGen2
             // cas to check g status
             casfrom_Gscanstatus(gp, _Gscanrunning, _Grunning)
             if preemptMSupported && debug.asyncpreemptoff == 0 && needAsync {
                now := nanotime()
                if now >= nextPreemptM {
                   nextPreemptM = now + yieldDelay/2
                   // preempt M
                   preemptM(asyncM)
                }
             }  
       }
       ...
    }
    ...
}

func preemptM(mp *m) {
    if GOOS == "darwin" || GOOS == "ios" {
       execLock.rlock()
    }

    if mp.signalPending.CompareAndSwap(0, 1) {
       if GOOS == "darwin" || GOOS == "ios" {
          pendingPreemptSignals.Add(1)
       }
    
       signalM(mp, sigPreempt)
    }

    if GOOS == "darwin" || GOOS == "ios" {
       execLock.runlock()
    }
}

Work stealing
#

Work stealing is a core part of the Fork-Join model. We can see that the ForkJoinThread in Java also has this feature. Similarly, the implementation of work stealing in Go is similar to that of the ForkJoinPool in Java, which is to findRunnable in the global scheduler to find tasks execute, and try to steal it from an idle p. Let’s take a look at the source code, specifically in go/src/runtime/proc.go.

func schedule() {
    ...
    gp, inheritTime, tryWakeP := findRunnable() // blocks until work is available
    ...
}

func findRunnable() (gp *g, inheritTime, tryWakeP bool) {
    ...
    // Spinning Ms: steal work from other Ps.
    if mp.spinning || 2*sched.nmspinning.Load() < gomaxprocs-sched.npidle.Load() {
       if !mp.spinning {
          mp.becomeSpinning()
       }
       // Try to steal the goroutine or timer waiting to be executed from any of the p
       gp, inheritTime, tnow, w, newWork := stealWork(now)
       if gp != nil {
          // Successfully stole.
          return gp, inheritTime, false
       }
       ...
    }
}

Go: Work-Stealing in Go Scheduler

Hand off
#

Handoff mainly involves taking over a syscall or a locked and blocked m and handing its corresponding g to another p to execute. This is detailed in the handoffp() method of go/src/runtime/proc.go. There are the following paths to handoffp():

  • m exit call: mexit() → handoffp()
  • global monitor call: sysmon() → retake() → handoffp()
  • call when entering the syscall block: entersyscallblock() → entersyscallblock_handoff() → handoffp()
  • Called when exiting the syscall: exitsyscall() → exitsyscall0() → stoplockedm() → handoffp()
  • Called for each p when performing gc: forEachP() → handoffp()

Go keyword compilation
#

In Go, a goroutine can be created simply by declaring the go keyword. The actual implementation is that at compile time, the compiler converts the go keyword into a call to the newproc() function. The specific compilation conversion is in go/src/cmd/compile/internal/ssagen/ssa.go.

// src/cmd/compile/internal/ssagen/ssa.go

func (s *state) stmt(n ir.Node) {
    ...
    s.stmtList(n.Init())
    switch n.Op() {
        case ir.OGO:
            n := n.(*ir.GoDeferStmt)
            s.callResult(n.Call.(*ir.CallExpr), callGo)
    }
}

func (s *state) call(n *ir.CallExpr, k callKind, returnResultAddr bool) *ssa.Value {
    ...
    switch {
        case k == callGo:
            aux := ssa.StaticAuxCall(ir.Syms.Newproc, s.f.ABIDefault.ABIAnalyzeTypes(nil, ACArgs, ACResults))
            call = s.newValue0A(ssa.OpStaticLECall, aux.LateExpansionResultType(), aux)
    }
}

The newproc() method creates a goroutine with the _Grunnable state

// Create a new g running fn.
// Put it on the queue of g's waiting to run.
// The compiler turns a go statement into a call to this.
func newproc(fn *funcval) {
    gp := getg()
    pc := getcallerpc()
    systemstack(func() {
       newg := newproc1(fn, gp, pc)

       pp := getg().m.p.ptr()
       runqput(pp, newg, true)

       if mainStarted {
          wakep()
       }
    })
}

Summary
#

Java is designed with the idea of first implementing a one-to-one correspondence based on the underlying thread model, and then encapsulating concurrent tools on top of that foundation, such as the various thread pools provided in Executors. In general, the 2 thread pools are built based on the ideas of pooling and ForkJoin. The downside is that developers need to understand it before they can use it more elegantly, which makes programming more difficult.

In contrast, Go provides GPM, which is also built based on the Fork-Join model, but adds monitoring and preemptive scheduling on top of it, and encapsulates this relatively complex design in the Go SDK. From the user’s perspective, it can be quickly implemented by simply using the go keyword. With the multi-threading guarantee provided by GPM, developers can achieve high concurrency and high throughput without having to specifically understand its design ideas, which reduces the difficulty of programming and greatly simplifies the writing style.

Other
#

memory model will not be organized for the time being. It will be organized at a later time. For related documents, please refer to these

The Go Memory Model - The Go Programming Language

Java memory model

Java & Go 并发编程对比

Related

Go language pointer performance
·1620 words·4 mins
Go language source code analysis memory access
The difference between Go value objects and pointer objects in terms of storage, performance and usage
Netty source code analysis and memory overflow ideas
·1570 words·8 mins
source code analysis Java language Netty
It mainly introduces the use and inheritance relationship of Buffers in Java NIO, Netty and Spring Reactor.