Goroutine

在 java/c++ 中我们要实现并发编程的时候, 我们通常需要自己维护一个线程池, 并且需要自己去包装一个又一个的任务, 同时需要自己去调度线程执行任务并维护上下文切换, 这一切通常会耗费程序员大量的心智;那么能不能有一种机制, 程序员只需要定义很多个任务, 让系统去帮助我们把这些任务分配到 CPU 上实现并发执行呢?

goroutine 的概念类似于线程, 但 goroutine 是由 Go 的运行时(runtime)调度和管理的;Go 程序会智能地将 goroutine 中的任务合理地分配给每个 CPU; 它在语言层面已经内置了调度和上下文切换的机制

在 Go 语言编程中你不需要去自己写进程, 线程, 协程, 你的技能包里只有一个技能–goroutine, 当你需要让某个任务并发执行的时候, 你只需要把这个任务包装成一个函数, 开启一个 goroutine 去执行这个函数就可以了, 就是这么简单粗暴;

用法

只需要在调用函数的时候在前面加上 go 关键字, 就可以为一个函数创建一个 goroutine;

一个 goroutine 必定对应一个函数, 可以创建多个 goroutine 去执行相同的函数

例如有如下程序:

1
2
3
4
5
6
7
func hello() {
fmt.Println("Hello Goroutine!")
}
func main() {
hello()
fmt.Println("main goroutine done!")
}

在这里, hello 函数和 main goroutine done 是串行打印的, 但是如果变成

1
2
3
4
5
6
7
func hello() {
fmt.Println("Hello Goroutine!")
}
func main() {
go hello() // 开启一个新的goroutine去执行hello函数
fmt.Println("main goroutine done!")
}

这样二者就会并行运行了

多线程同步

GPM 调度模型

这个 GPM 是三个抽象体的缩写:

  • Goroutine (G): 代表一个 goroutine, 也就是一个任务
    • 保存 goroutine 的运行信息(函数入口, 堆栈, 状态等), 以及与 P(处理器)的绑定关系
  • Processor §: 代表处理器, 不是 CPU, 而是 Go runtime 里的一个抽象
    • 维护着一个 goroutine 本地队列(run queue)
    • 保存当前执行所需的上下文(函数指针, 栈地址等)
    • 负责调度: 从队列里拿 goroutine 来执行, 如果自己的队列空了, 会去:
    1. 全局队列(global run queue)拿任务;
    2. 或者去其他 P 的队列里"偷任务"(work stealing);
    • 理解: P 让 goroutine 的调度变得可控, 避免所有 G 都堆到一个地方, 保证多核利用率
  • Machine (M): 代表一个操作系统线程, 真正执行 G 的实体

G 要跑, 必须挂在 P 上, 然后 P 绑定到 M 上执行
如果某个 G 长时间阻塞(比如系统调用), 这个 M 就会卡住;Go runtime 会新建一个 M, 把阻塞 P 上剩下的 G 挂到新 M 上, 避免影响并发

Runtime

  • runtime.Gosched(): 主动让出 CPU, 让调度器重新调度, 类似于 C 的 yield()
  • runtime.Goexit(): 终止当前 goroutine, 不会影响其他 goroutine, 类似于 C 的 exit()
  • runtime.GOMAXPROCS(n int) int: 设置可同时执行的最大 CPU 核数, 返回之前的值, 默认值是机器的 CPU 核数

CSP 并发模型

CSP 指的是 Communicating Sequential Processes, 其核心思想是:

  • 并发模型: 把系统看成一组顺序执行的进程, 通过通信事件进行交互;
  • 通信方式: 经典 CSP 是同步消息传递(rendezvous): 发送者和接收者在同一时刻配对完成一次通信事件(无缓冲);
  • 形式化: 用代数/逻辑刻画并发行为, 可进行可证明的推理(如死锁检查);

引用 UTF-8 的作者之一 Rob Pike 的发言, Do not communicate by sharing memory; instead, share memory by communicating., 也就是不要通过 482 风格的 monitors 的共享内存方法 (即多线程会共享同一个进程下的所有变量资源并且通过各种 sync primitives 来确保 mutex) 来进行线程之间的变量通信, 而是采用这里的 go 风格来实现利用管道传输共享数据
不过现代并发系统中两种风格都存在, 因为 482 风格的同步虽然写起来麻烦, 但是直接共享内存的性能 (速度)远远高于利用管道的传输速度, 虽然后者写起来会方便很多

整体上可以将模型抽象成 3 个元素:

  • 进程(Process): 独立执行的实体, 有自己的状态和行为; 在 Go 语言中对应 goroutine;
  • 通道(Channel): 进程间通信的媒介, 可以是同步或异步的; 在 Go 语言中对应 channel;
  • 事件(Event): 进程间交互的动作, 如发送/接收消息; 在 Go 语言中对应 channel 的发送/接收操作;

Channel 通道

1
var ch chan int  // 声明一个传输 int 类型数据的 channel
  • 发送接收的 happens-before 语义:
    在 Go 内存模型中, ch <- v(发送)与 <-ch(接收)之间存在同步关系;发送 goroutine 在发送点之前的所有写操作, 对接收方 goroutine 都是可见的;这保证了通过 channel 传递的值是"最新写入"的;

  • 无缓冲 channel (capacity = 0):
    无缓冲 channel 没有存储空间, 发送和接收必须同时发生, 才能完成一次通信;这被称为 rendezvous (会合点), 类似接力赛传递接力棒;

  • 值传递与所有权转移:

    • 对于值类型(如 int, struct), 发送时是拷贝;发送完成后修改原变量不会影响接收方;
    • 对于引用类型(slice, map, 指针), 发送的是引用头部, 底层数据仍可能被共享;因此推荐在发送前完成修改, 或深拷贝, 或通过所有权转移约定来保证安全;
  • 常见模式 idiom:

    1. 在发送前构造好对象;
    2. 通过 channel 传递对象;
    3. 接收方读取对象, 保证看到的值是发送方写入的结果; (happen-before)
  • 单向 channel:

可以限制 channel 的方向:

不过很多人会问, 如果 channel 只有单向那么是不是就只能从一边进行读取/写入了呢? 显然不是如此, 其实这种定义只能出现在类的内部 也就是这个单向指的是对外暴露的接口只能是一个方向的, 但是在类内部的实现函数中其实是支持另一个方向的控制的

1
2
var send chan<- int   // 只能发送
var recv <-chan int // 只能接收

双向 channel 可以赋值给单向 channel, 反之不行;

  • 示例:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    func worker(ch chan int, label string) {
    val := <-ch
    fmt.Printf("%s init %d\n", label, val)
    for i := 0; i < 1000000; i++ {
    val++
    }
    fmt.Printf("%s final %d\n", label, val)
    ch <- val
    }

这里 worker 从 channel 中取值, 做一百万次加法, 再将结果写回 channel;多个 goroutine 可以像"接力棒"一样顺序接力计算;

这样如果有多个 worker, 那么其可以并行, 并且在构造完所有的 go routine 之后可以在主进程中打开第一个 channel 就可以等待这些的运行完成从而获得并发数据

close 机制

close 可以关闭一个 channel, 也就是禁止后面的代码再次向这个 channel 发送数据, 也就是禁止 ch <- val, 如果后面再次发送数值到 channel 会触发 panic

如果对于一个 buffer_size == 0 的无缓存 channel 来说, 如果关闭 channel
之后再次读取这个 channel 即 close(ch); val, ok := <-ch 那么就会得到 val == 0, ok == false, 也就是读取到 channel 的零值, 并且 ok 标志位为 false 代表 channel 已经被关闭

如果 channel 的缓存不是 0 那么就会收到 channel 里剩余的值, 直到 channel 为空, 然后再读取就会得到零值和 ok == false (原来 ok == true)

for-range 范围取值

for-range 指令不断尝试从 channel 里取值, 直到同时(这里是 and, && 逻辑)满足两个条件:

  1. channel 已经被关闭 (close(ch))
  2. channel 内部缓冲区的数据已全部取完

range 指令本身是能从 channel 中取出值的, 即等价于按照 FIFO 逐步执行 val := <-ch 直到 channel 被关闭并且内部缓冲区数据取完`

为了高效地从 有 buffer 的 channel 中取值, 可以使用 for-range 语法:

1
2
3
for val := range ch{
// val 是从 ch 中取出的值
}

例如对于无 buffer 的 channel 也是可以使用 range 读取的, 而且对这种特殊情况, range 只会在 channel 被关闭并且数据取完后才会退出循环 (因为不存在代码 buffer 未取完的情况)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
ch := make(chan int)

go func() {
for i := 1; i <= 3; i++ {
ch <- i
}
close(ch) // 通知消费完毕
}()

for v := range ch { // 等待 goroutine 彻底 close ch
fmt.Println(v)
}
// 输出:
// 1 2 3

注意对于有 buffer 的 channel 来说如果channel内被读取完了, 但是没有收到 close 也会阻塞等待

for-select 多路复用

select 语句可以让一个 goroutine 等待多个通信操作, 也就是多路复用(multiplexing);它类似于 switch 语句, 但每个 case 都必须是一个通信操作(发送或接收);

在本课程的讨论范围内, 我们主要考虑 select channel 的情形:

1
2
3
4
5
6
7
8
9
10
11
12
13
for {
select {
case val = <- ch1:
// 处理 ch1 的值
case val = <- ch2:
// 处理 ch2 的值
case ch <- val:
// 发送 val 到 ch
case <- terminate:
// 退出循环
return
}
}

这里