CSP模型
CSP,全称 Communicating sequential processes
是指并发系统中的一种模式。简单来说,CSP 模型由并发执行的实体(线程或者进程或其他)所组成,实体之间通过消息进行通信。Go语言的并发特性正是基于CSP模型发展而来,具体来说,goroutine 就是并发执行的实体,而 goroutine 之前通信就是借助了 channel 来进行消息传递。Go语言有一种哲学叫做
Do not communicate by sharing memory; instead, share memory by communicating.
意思是尽量通过通信来共享内存,而不是通过共享内存来通信。由此可见 channel 在 Go 中的重要地位。
Go的channel
通常的线程模型,一个很重要的问题就是各线程之前的同步。我们通常采用条件变量或者信号量来做同步操作,如果需要传递消息,经常要自己实现一个带锁的线程安全的消息队列。
而在Go中,为了方便 goroutine 之前的通信,原生支持了一种称为 channel 的数据结构,用来做同步和消息传递。它的常用操作如下:
创建
1 | unBufferChan := make(chan int) // 无缓冲 |
读写
1 | // 阻塞读操作 |
关闭
1 | // 关闭 |
select (类似IO多路复用),只要其中一个满足,则执行后续操作
1 | select { |
以上操作都是阻塞的,同时 channel 也支持非阻塞的读写操作,类似于 IO 操作中如果没有满足条件的则返回 EAGAIN 或者 EWOULDBLOCK。这需要借助 select 的 default 分支来实现
1 | select { |
上面的非阻塞操作,如果 ch 中没有读取到数据,也不会阻塞,而是进入default分支。类似的还有非阻塞写,这里就不具体说明了。
Channel的实现原理
下面以 Go1.8 的源码为例来研究下 channel 各个操作的实现原理。
channel 结构
hchan 结构用来表示一个channel,具体信息如下
1 | type hchan struct { |
可以看到,hchan主要包含
环形队列,用来存放消息(只针对带缓冲channel)。涉及的字段有
qcount
当前队列中的消息数量dataqsiz
队列大小buf
队列具体数据bufsendx
、recvx
环形队列的发送和接收游标
- 元素信息
elemtype
、elemsize
。其中elemtype
的类型_type
是 Go 中表示变量类型的基础结构,在反射和interface 的实现中很常用。因为 channel 可以持有各种类型的数据,所以需要维护元素信息。 - lock,用来做并发互斥。
- 等待接收或等待发送的 goroutine 队列。
recvq
和sendq
都是用双向链表实现的队列,如果有 goroutine 因为读写被阻塞,就会被调度器挂起在这两个队列上。我们之前分析 Go 调度模型GMP的时候有提到,一般 G (goroutine) 和 P (proc) 挂钩,处于等待执行或者正在执行状态。如果正在执行的 G 阻塞在 channel 上,就会脱离 P,转而挂起在 channel 的recvq
或者sendq
上,完成 channel 操作后再选择合适的 P 来继续执行。 - closed。用来表示channel是否关闭。关闭 channel 可用用来实现广播的效果,后面会讲到。
- sudog 代表一个goroutine
创建channel
创建channel就是如何新建一个 hchan 结构的过程,源码如下:
1 | func makechan(t *chantype, size int64) *hchan { |
元素大小不能超过64k(1<<16)
channel的元素个数不能为负数且不能超过一定数量
如果channel里存的是非指针的具体对象,则channel和具体存放的元素buf会一起分配,GC不会扫描这部分buf数据,因为buf相当于是channel的一部分。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15if elem.kind&kindNoPointers != 0 || size == 0 {
// Allocate memory in one call.
// Hchan does not contain pointers interesting for GC in this case:
// buf points into the same allocation, elemtype is persistent.
// SudoG's are referenced from their owning thread so they can't be collected.
// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
c = (*hchan)(mallocgc(hchanSize+uintptr(size)*elem.size, nil, true))
if size > 0 && elem.size != 0 {
c.buf = add(unsafe.Pointer(c), hchanSize)
} else {
// race detector uses this location for synchronization
// Also prevents us from pointing beyond the allocation (see issue 9401).
c.buf = unsafe.Pointer(c)
}
}否则 buf 分开分配内存,可能会被GC 回收到。
1
2
3
4} else {
c = new(hchan)
c.buf = newarray(elem, int(size))
}
创建完成后的结构,利用 gdb 调试时看到 hchan 的数据
1 | (gdb) p c |
可以看到 c 的内存地址和 buf 的内存地址是连续的。
如果创建的是指针类型 channel
, c
的地址就和 buf
地址是分开的
1 | (gdb) p c |
写channel
写对应的操作是 ch <- x
, 编译器编译后实际调用了以下源码
1 | // entry point for c <- x from compiled code |
nil channel
1 | if c == nil { |
对于nil channel
, 如果是非阻塞写,则会立即返回;如果是阻塞写,则当前 goroutine 会通过 gopark
调用进入等待状态。
closed channel
写已经closed的channel会panic
1 | if c.closed != 0 { |
recvq
如果 recvq 不为空,则表明有 goroutine 被挂起等待数据,此时即使是带缓冲的channel,也不会将数据存储到buf里,而是直接发给队列头的 goroutine。
1 | if sg := c.recvq.dequeue(); sg != nil { |
数据会从发送 goroutine 的栈上(ep)直接被拷贝到挂起的 sudog 的 数据域 elem 上,然后通过 goready 唤醒这个sudog
1 | if sg.elem != nil { |
buf not full
如果没有等待接收者,且缓冲没满,则将数据拷贝到缓冲区中,这里就是简单的环形缓冲的操作
1 | if c.qcount < c.dataqsiz { |
通过 chanbuf
定位到具体数据 qp,然后将 ep 指向的数据拷贝到其中,同时更新游标 sendx 和 元素数量 qcount
buf full
如果 buf 满了,对于非阻塞发送,就立即返回
1 | if !block { |
对于阻塞发送,当前发送 goroutine 就被阻塞住挂起在等待队列里
1 | // Block on the channel. Some receiver will complete our operation for us. |
待发送的数据 ep 被放置在 sudog 的 elem 上,sudog 本身入队到 sendq 中,并挂起。
读channel
读对应的操作是 x = <- ch
, 编译器编译后实际调用了以下源码
1 | // chanrecv receives on channel c and writes the received data to ep. |
ep 代表变量 x 指向的内存区,用来存储接收到的数据,如果 ep 为 nil,则读到的数据会被忽略
nil channel
对于未初始化的 channel,如果非阻塞读,则直接返回;如果是阻塞读,会通过 gopark 将当前 goroutine 转为等待状态。
1 | if c == nil { |
closed channel
1 | if c.closed != 0 && c.qcount == 0 { |
对于已经关闭的 channel,如果 buf 中没有数据了,则返回元素类型的零值 (通过 typedmemclr
来处理)
如果 buf 中还有数据,还是会继续走后续流程读取数据 (从挂起的 sender 中或者 buf 中)。例如
1 | ch := make(chan int, 5) |
输出以下内容
1 | data:1 isClosed:true |
sendq
如果 sendq 有被挂起等待发送数据的 goroutine,则获取队头的 goroutine,调用 recv
进行处理
1 | if sg := c.sendq.dequeue(); sg != nil { |
recv
做的具体工作我们继续看下
1 | if c.dataqsiz == 0 { |
如果 channel 是非缓冲的 (dataqsiz == 0),则接收者直接从发送者手里接收数据
1 | if c.dataqsiz == 0 { |
如果 channel 是带缓冲的 (dataqsiz > 0),则表明此时 buf 已经满了,此时才会有等待发送的 goroutine 被挂起。需要做的事情就是从环形队列中读取一个元素,这时会空出一个元素的位置,之前挂起等待的队头 sudog 的数据就被写入环形队列。如果还有其他阻塞等待写的 sugog,继续挂起等待后续的读。
1 | } else { |
处理完成之后,之前 sendq 中挂起的发送 goroutine 就被 goready
调用唤醒
1 | sg.elem = nil |
buf not empty
如果 buf 有数据,则直接将数据读出到 ep 内存处,然后将其置为零值
1 | if c.qcount > 0 { |
buf empty
如果 buf 中没有数据,对于非阻塞读取,立即返回
1 | if !block { |
对于阻塞读,则当前读 goroutine 被挂起到 recvq 中等待后续数据写
1 | // no sender available: block on this channel. |
select channel
select 用于多个channel监听并收发消息,当任何一个case满足条件则会执行,若没有可执行的case,就会执行default,如果没有default,程序就会阻塞。select 的作用很类似于 IO多路复用。
多 channel select
前面提到,典型的 select 用法是用在多个 channel 上,例如
1 | ch := make(chan int, 5) |
select 的结构为 hselect
,其定义如下
1 | // Select statement header. |
对其操作的具体源码实现是
1 | selectgoImpl(sel *hselect) (uintptr, uint16) |
select 的多个选项会被包装成多个scase
结构,然后依据 lockorder 来处获得所有锁
1 | // lock all the channels involved in the select |
sellock
是实现为
1 | func sellock(scases []scase, lockorder []uint16) { |
锁会有去重判断,方式多个case 操作一个 channel 导致重复上锁问题。
然后依次查看所有的case是否有对应的事件:
1 | for i := 0; i < int(sel.ncase); i++ { |
注意这里遍历的时候并不是按照代码里 case 的顺序,而是按照 pollorder
来的,这个 pollorder
是随机出来的顺序,因此如果有多个满足条件的 case,则最终选中的 case 是哪一个是随机的。这样能避免一直选中写在前面的 case 而导致其他 case 饿死 的情况。pollorder
通过以下代码随机洗牌而来
1 | // generate permuted order |
如果过程中遍历到的 case 有一个是非阻塞的操作,则 select 会立即返回,不会再去检查后续的 case 是否 ready;如果每个 case 对其 channel 的操作都是阻塞的且没有 default 分支,则 select 会一直阻塞,而且会挂起在涉及的所有的 channel 的 recvq 或者 sendq 上:
1 | // pass 2 - enqueue on all chans |
单 channel 非阻塞select
单 channel 的非阻塞 select 和前面多 channel 的select的实现机制不太一样,相对更简单,编译器会将其编译成 chanrecv
或者 chansend
调用
编译器会将以下非阻塞读取
1 | select { |
编译成
1 | if selectnbsend(c, v) { |
其中 非阻塞写的实现还是调用前面提到的 chansend
1 | func selectnbsend(t *chantype, c *hchan, elem unsafe.Pointer) (selected bool) { |
编译器会将以下非阻塞写
1 | select { |
编译成
1 | if selectnbrecv(&v, c) { |
其中 非阻塞读的实现还是调用前面提到的 chanrecv
1 | func selectnbrecv(t *chantype, elem unsafe.Pointer, c *hchan) (selected bool) { |
close channel
close channel 的操作对应源码中的 closechan
调用
1 | func closechan(c *hchan) { |
nil channel
如果关闭一个未初始化的 channel,会 panic
1 | if c == nil { |
closed channel
1 | if c.closed != 0 { |
关闭一个已经关闭的 channel 也会导致 panic。所以一般使用时,是由唯一的一个生产者来关闭 channel。
正常关闭
正常关闭 channel 时,首先设置 closed 标识,然后将循环遍历,将 recvq 和 sendq 中挂起等待的 goroutine 收集到 glist 链表中等待调度
1 | c.closed = 1 |
然后调度 glist 中的 goroutine
1 | // Ready all Gs now that we've dropped the channel lock. |
具体的:
- 等待读的 goroutine,会将数据区 elem 置为零值 (
typedmemclr(c.elemtype, sg.elem)
) 然后继续执行 - 等待写的 goroutine,将会 panic
参考资料