共计 7327 个字符,预计需要花费 19 分钟才能阅读完成。
背景
众所周知,Go 语言天生就支持并发编程,更进一步地,Go 语言在诞生之初就是为了方便人们能更轻松地写出高并发程序,从而更好利用日益增长的计算机多核性能。
除了 Go 之外,也不乏有能较好支持并发编程的语言例如 Java。与 Go 不同,Java 的并发利用的是同一进程中可以同时拥有多个线程。进程我们知道是系统资源分配的基本单位,操作系统将一块物理内存映射到进程地址空间上(进程创建必须分配一个完整的独立地址空间),这块内存就属于这个进程,进程内的所有线程都可以访问这块内存,但相对其他进程而言是被隔离的一块内存区域。
线程作为独立运行和独立调度的基本单位,可以被视作是进程的一个执行单位。线程上下文一般只包含 CPU上下文 及其他的线程管理信息,线程创建的开销主要取决于为线程堆栈的建立而分配内存的开销,这些开销并不大,但某些系统级的线程是基于内核实现的,这会导致线程的上下文切换开销跟进程一样大,作为一个主攻高并发领域的语言,当然不希望看到频繁的上下文切换,所以在 Go 语言中引入了协程 Goroutine 的概念。
Goroutine 本质上是个函数,它可以与程序中存在的其他 Goroutine 并发运行的同时,保持相对独立。从 Goroutine 的运行原理上来看,可以将 Goroutine 视作一种轻量级的线程,但与实际的线程相比,创建一个 Goroutine 的成本非常小。每个程序至少包含一个 Goroutine,并且该 Goroutine 被称为主 Goroutine。如果主 Goroutine终 止,那么所有由主 Goroutine 派生来的子 Goroutine 都将终止。
Goroutine 本身只是一个数据结构,真正让 Goroutine 运行起来的是调度器
调度器
为什么需要一个调度器
在计算机上运行的程序最终都是需要 CPU 去执行,协程只是运行在操作系统的用户态。协程真正的执行依然需要依靠操作系统内核态的线程去执行。
操作系统并不知道协程的存在,会把协程当做普通的程序来执行。既然协程是为了提高程序的执行效率,那么一个理想的情况是一个线程上可以执行多个协程。
如果一个协程对于一个线程,那就相当于协程的创建和运行还是由内核态来执行,这样的代价有点高。但如果一个线程上可以运行多个协程,如果其中的一个协程发生了阻塞,那么其他的协程就都无法执行了。
所以理想的情况是协程是线程的关系是 m:n,这样就可以克服 m:1 和 1:1 的缺点。但 m:n 的情况最为复杂,需要自己来实现协程在多个线程的调度,充分利用计算机的多核能力,再配合协程的轻量级的特性,实现程序的高并发。
在 Go 的实现中,Goroutine 与内核态线程的对应关系就是是 m:n,所以就需要自己实现一个协程的调度器。
调度器的结构
Go 调度器从最开始到现在也经历了不断的演进,最初的那个版本已经被放弃,目前使用的版本是在 2012 重新设计的,然后沿用至今。
现在用的这个调度器也被称之为 GMP 模型,3 个字母分表代表一个关键部件的名称:
- G:表示 goroutine,就是代表待执行的协程
- M:M 表示的是内核态的线程,goroutine 真正的执行需要依赖 M
- P:P 是调度器的核心,它会把 G 调度到合适的 M 上去执行,让 G 的执行尽可能快的完成
如果 M,也就是线程如果想要运行任务,就需要去获取一个 P,然后从 P 的任务队列中获取 goroutine 来执行。
在 P 上,会有一个正在 M 上执行的 G,但是同时也会维护一个本地的队列,里面都是待执行的 G,其中 P 的数量由 GOMAXPROCS 环境变量或者 runtime.GOMAXPROCS() 来决定,这表示在同一时间,只有 GOMAXPROCS 数量个 goroutine 在执行。
P 与 M 的数量没有固定的关系,如果当前的 M 阻塞了,P 就会去创建或者切换到另一个 M 上。
调度器是如何运作的
在介绍完 GMP 的结构之后,我们再来看一下 GMP 调度器是如何运行起来的。
在 Go 语言中,我们创建一个 goroutine 非常简单,只需要使用 go 关键字:
go func() {
fmt.Println("New goroutine")
}()
这样就会创建上面所说的一个 G,然后放进调度器中开始调度。
每个 G 在被创建之后,都会被优先放入到本地队列中,如果本地队列已经满了,就会被放入到全局队列中。
然后每个 M 就开始执行 P 的本地队列中的 G,如果某个 M 把任务都执行完成之后,然后就会去去全局队列中拿 G,这里需要注意,每次去全局队列拿 G 的时候,都需要上锁,避免同样的任务被多次拿。
如果全局队列都被拿完了,而当前 M 也没有更多的 G 可以执行的时候,它就会去其他 P 的本地队列中拿任务,这个机制被称之为 work stealing 机制,每次会拿走一半的任务,向下取整,比如另一个 P 中有 3 个任务,那一半就是一个任务。
这样还有一个特别的场景需要说明,当一个 M 被阻塞时,M 就会与 P 解绑,让 P 去找其他空闲的 M 绑定执行后面的 G,如果没有空闲的 M,就会创建一个新的 M。当 M 阻塞结束之后,就会把 G 放入到全局队列中,这个机制称之为 hand off 机制。
work stealing 和 hand off 机制提高了线程的使用效率,避免的线程重复创建和销毁。
当全局队列为空,M 也没办法从其他的 P 中拿任务的时候,就会让自身进入自选状态,等待有新的 G 进来。最多只会有 GOMAXPROCS 个 M 在自旋状态,过多 M 的自旋会浪费 CPU 资源,多余的 M 的就会与 P 解绑,进入到休眠状态。
概念介绍
在进行实现原理之前,了解下一些关键性术语的概念。
并发
一个cpu上能同时执行多项任务,在很短时间内,cpu来回切换任务执行(在某段很短时间内执行程序a,然后又迅速得切换到程序b去执行),有时间上的重叠(宏观上是同时的,微观仍是顺序执行),这样看起来多个任务像是同时执行,这就是并发。
并行
当系统有多个CPU时,每个CPU同一时刻都运行任务,互不抢占自己所在的CPU资源,同时进行,称为并行。
进程
cpu在切换程序的时候,如果不保存上一个程序的状态(也就是我们常说的context–上下文),直接切换下一个程序,就会丢失上一个程序的一系列状态,于是引入了进程这个概念,用以划分好程序运行时所需要的资源。因此进程就是一个程序运行时候的所需要的基本资源单位(也可以说是程序运行的一个实体)。
线程
cpu切换多个进程的时候,会花费不少的时间,因为切换进程需要切换到内核态,而每次调度需要内核态都需要读取用户态的数据,进程一旦多起来,cpu调度会消耗一大堆资源,因此引入了线程的概念,线程本身几乎不占有资源,他们共享进程里的资源,内核调度起来不会那么像进程切换那么耗费资源。
协程
协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。因此,协程能保留上一次调用时的状态(即所有局部状态的一个特定组合),每次过程重入时,就相当于进入上一次调用的状态,换种说法:进入上一次离开时所处逻辑流的位置。线程和进程的操作是由程序触发系统接口,最后的执行者是系统;协程的操作执行者则是用户自身程序,goroutine也是协程。
优势
相对于线程的优势
- 与线程相比,goroutine占用系统资源较少,在Go的最新版本中,单个goroutine的堆栈初始大小只有2kb,根据应用程序的需要堆栈的大小可以进行动态扩缩容,但对于线程而言,则必须在创建时指定堆栈大小,从这点来说goroutine对堆栈的利用效率上更胜一筹。
- Goroutines被可以被多路复用。程序中的单个线程可以同时运行数千个Goroutine,如果某个线程因为业务逻辑(等待用户输入等)处于阻塞状态,那么程序会动态的创建一个新的线程并将被阻塞线程的其余Goroutines迁移至新的os 线程,而这些动态切换都会由系统自行处理,那么使用者就可以从这些复杂的细节中抽身出来,专注于业务层面的代码开发。
- Goroutines之间使用channel进行通信,而通道channel在设计之初就是为了避免不同Goroutines访问共享内存时发生竞争,相对比起线程的加锁机制来说,协程的效率会更高一些。
Goroutine 使用
1 启动单个goroutine
启动 goroutine 的方式非常简单,只需要在调用函数(普通函数和匿名函数)前加上一个go
关键字。
我们先来看一个在 main 函数中执行普通函数调用的示例。
package main
import (
"fmt"
)
func hello() {
fmt.Println("hello")
}
func main() {
hello()
fmt.Println("你好")
}
将上面的代码编译后执行,得到的结果如下:
hello
你好
代码中 hello 函数和其后面的打印语句是串行的。
接下来我们在调用 hello 函数前面加上关键字go
,也就是启动一个 goroutine 去执行 hello 这个函数。
func main() {
go hello() // 启动另外一个goroutine去执行hello函数
fmt.Println("main goroutine done!")
}
将上述代码重新编译后执行,得到输出结果如下。
你好
这一次的执行结果只在终端打印了”你好”,并没有打印 hello
。这是为什么呢?
其实在 Go 程序启动时,Go 程序就会为 main 函数创建一个默认的 goroutine 。在上面的代码中我们在 main 函数中使用 go 关键字创建了另外一个 goroutine 去执行 hello 函数,而此时 main goroutine 还在继续往下执行,我们的程序中此时存在两个并发执行的 goroutine。当 main 函数结束时整个程序也就结束了,同时 main goroutine 也结束了,所有由 main goroutine 创建的 goroutine 也会一同退出。也就是说我们的 main 函数退出太快,另外一个 goroutine 中的函数还未执行完程序就退出了,导致未打印出“hello”。
main goroutine 就像是《权利的游戏》中的夜王,其他的 goroutine 都是夜王转化出的异鬼,夜王一死它转化的那些异鬼也就全部GG了
所以我们要想办法让 main 函数‘“等一等”将要在另一个 goroutine 中运行的 hello 函数。其中最简单粗暴的方式就是在 main 函数中“time.Sleep”
一秒钟了(这里的1秒钟只是我们为了保证新的 goroutine 能够被正常创建和执行而设置的一个值)。
按如下方式修改我们的示例代码。
package main
import (
"fmt"
"time"
)
func hello() {
fmt.Println("hello")
}
func main() {
go hello()
fmt.Println("你好")
time.Sleep(time.Second)
}
将我们的程序重新编译后再次执行,程序会在终端输出如下结果,并且会短暂停顿一会儿。
你好
hello
为什么会先打印你好
呢?
这是因为在程序中创建 goroutine 执行函数需要一定的开销,而与此同时 main 函数所在的 goroutine 是继续执行的。
在上面的程序中使用time.Sleep
让 main goroutine 等待 hello goroutine执行结束是不优雅的,当然也是不准确的。
Go 语言中通过sync
包为我们提供了一些常用的并发原语,我们会在后面的小节单独介绍sync
包中的内容。在这一小节,我们会先介绍一下 sync 包中的WaitGroup
。当你并不关心并发操作的结果或者有其它方式收集并发操作的结果时,WaitGroup
是实现等待一组并发操作完成的好方法。
下面的示例代码中我们在 main goroutine 中使用sync.WaitGroup
来等待 hello goroutine 完成后再退出。
package main
import (
"fmt"
"sync"
)
// 声明全局等待组变量
var wg sync.WaitGroup
func hello() {
fmt.Println("hello")
wg.Done() // 告知当前goroutine完成
}
func main() {
wg.Add(1) // 登记1个goroutine
go hello()
fmt.Println("你好")
wg.Wait() // 阻塞等待登记的goroutine完成
}
将代码编译后再执行,得到的输出结果和之前一致,但是这一次程序不再会有多余的停顿,hello goroutine 执行完毕后程序直接退出。
2 基本使用
设置 Goroutine 运行的 CPU数量,最新版本的 Go 已经默认已经设置了
num := runtime.NumCPU() //获取主机的逻辑CPU个数
runtime.GOMAXPROCS(num) //设置可同时执行的最大CPU数
package main
import (
"fmt"
"time"
)
func cal(a int, b int) {
c := a + b
fmt.Printf("%d + %d = %d\n", a, b, c)
}
func main() {
for i := 0; i < 10; i++ {
go cal(i, i+1) //启动10个goroutine 来计算
}
time.Sleep(time.Second * 2) // sleep作用是为了等待所有任务完成
}
运行结果:
3 异常捕捉
当启动多个 Goroutine 时,如果其中一个 Goroutine异常了,并且我们并没有对进行异常处理,那么整个程序都会终止,所以我们在编写程序时候最好每个 Goroutine 所运行的函数都做异常处理,异常处理采用 recover
package main
import (
"fmt"
"time"
)
func addele(a []int, i int) {
defer func() { //匿名函数捕获错误
err := recover()
if err != nil {
fmt.Println("add ele fail")
}
}()
a[i] = i
fmt.Println(a)
}
func main() {
Arry := make([]int, 4)
for i := 0; i < 10; i++ {
go addele(Arry, i)
}
time.Sleep(time.Second * 2)
}
4 同步的 Goroutine
由于 Goroutine 是异步执行的,那很有可能出现主程序退出时还有 Goroutine 没有执行完,此时 Goroutine 也会跟着退出。此时如果想等到所有 Goroutine 任务执行完毕才退出,Go 提供了 sync 包和 channel 来解决同步问题,当然如果你能预测每个 Goroutine 执行的时间,你还可以通过 time.Sleep
方式等待所有的 Groutine 执行完成以后在退出程序。
使用 sync 包同步 Goroutine
WaitGroup 等待一组 Goroutinue 执行完毕,主程序调用 Add 添加等待的 Goroutinue 数量,每个 Goroutinue 在执行结束时调用 Done ,此时等待队列数量减1,主程序通过 Wait 阻塞,直到等待队列为0
package main
import (
"fmt"
"sync"
)
func cal(a int, b int, wg *sync.WaitGroup) {
c := a + b
fmt.Printf("%d + %d = %d\n", a, b, c)
defer wg.Done() //goroutinue完成后, WaitGroup的计数-1
}
func main() {
var wg sync.WaitGroup //声明一个WaitGroup变量
for i := 0; i < 10; i++ {
wg.Add(1) // WaitGroup的计数加1
go cal(i, i+1, &wg)
}
wg.Wait() //等待所有goroutine执行完毕
}
通过channel实现goroutine之间的同步
通过 channel 能在多个 Groutine 之间通讯,当一个 Goroutine 完成时候向 channel 发送退出信号,等所有goroutine退出时候,利用 for 循环 channe 取 channel 中的信号,若取不到数据会阻塞原理,等待所有 Goroutine 执行完毕,使用该方法有个前提是你已经知道了你启动了多少个 Goroutine
package main
import (
"fmt"
"time"
)
func cal(a int, b int, Exitchan chan bool) {
c := a + b
fmt.Printf("%d + %d = %d\n", a, b, c)
time.Sleep(time.Second * 2)
Exitchan <- true
}
func main() {
Exitchan := make(chan bool, 10) //声明并分配管道内存
for i := 0; i < 10; i++ {
go cal(i, i+1, Exitchan)
}
for j := 0; j < 10; j++ {
<-Exitchan //取信号数据,如果取不到则会阻塞
}
close(Exitchan) // 关闭管道
}
使用channel模拟消费者和生产者模式
Goroutine 本质上是协程,可以理解为不受内核调度,而受 go 调度器管理的线程,Goroutine 之间可以通过 channel 进行通信或者说是数据共享,当然你也可以使用全局变量来进行数据共享。
package main
import (
"fmt"
"sync"
)
func Productor(mychan chan int, data int, wait *sync.WaitGroup) {
mychan <- data
fmt.Println("product data:", data)
wait.Done()
}
func Consumer(mychan chan int, wait *sync.WaitGroup) {
a := <-mychan
fmt.Println("consumer data:", a)
wait.Done()
}
func main() {
datachan := make(chan int, 100) //通讯数据管道
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
go Productor(datachan, i, &wg) //生产数据
wg.Add(1)
}
for j := 0; j < 10; j++ {
go Consumer(datachan, &wg) //消费数据
wg.Add(1)
}
wg.Wait()
}