Go研究之channel

CSP模型

CSP,全称 Communicating sequential processes 是指并发系统中的一种模式。简单来说,CSP 模型由并发执行的实体(线程或者进程或其他)所组成,实体之间通过消息进行通信。Go语言的并发特性正是基于CSP模型发展而来,具体来说,goroutine 就是并发执行的实体,而 goroutine 之前通信就是借助了 channel 来进行消息传递。Go语言有一种哲学叫做

Do not communicate by sharing memory; instead, share memory by communicating.

意思是尽量通过通信来共享内存,而不是通过共享内存来通信。由此可见 channel 在 Go 中的重要地位。

Go的channel

通常的线程模型,一个很重要的问题就是各线程之前的同步。我们通常采用条件变量或者信号量来做同步操作,如果需要传递消息,经常要自己实现一个带锁的线程安全的消息队列。

而在Go中,为了方便 goroutine 之前的通信,原生支持了一种称为 channel 的数据结构,用来做同步和消息传递。它的常用操作如下:

创建

1
2
unBufferChan := make(chan int)  // 无缓冲
bufferChan := make(chan int, N) // 带缓冲

读写

1
2
3
4
5
// 阻塞读操作
x := <- ch

// 阻塞写操作
ch <- x

关闭

1
2
// 关闭
close(ch)

select (类似IO多路复用),只要其中一个满足,则执行后续操作

1
2
3
4
5
6
select {
case e, ok := <-ch1:
...
case e, ok := <-ch2:
...
}

以上操作都是阻塞的,同时 channel 也支持非阻塞的读写操作,类似于 IO 操作中如果没有满足条件的则返回 EAGAIN 或者 EWOULDBLOCK。这需要借助 select 的 default 分支来实现

1
2
3
4
5
select {
case e := <-ch:
...
default:
}

上面的非阻塞操作,如果 ch 中没有读取到数据,也不会阻塞,而是进入default分支。类似的还有非阻塞写,这里就不具体说明了。

Channel的实现原理

下面以 Go1.8 的源码为例来研究下 channel 各个操作的实现原理。

channel 结构

hchan 结构用来表示一个channel,具体信息如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
type hchan struct {
qcount uint // total data in the queue
dataqsiz uint // size of the circular queue
buf unsafe.Pointer // points to an array of dataqsiz elements
elemsize uint16
closed uint32
elemtype *_type // element type
sendx uint // send index
recvx uint // receive index
recvq waitq // list of recv waiters
sendq waitq // list of send waiters

// lock protects all fields in hchan, as well as several
// fields in sudogs blocked on this channel.
//
// Do not change another G's status while holding this lock
// (in particular, do not ready a G), as this can deadlock
// with stack shrinking.
lock mutex
}

type waitq struct {
first *sudog
last *sudog
}

// sudog represents a g in a wait list, such as for sending/receiving
// on a channel.
//
// sudog is necessary because the g ↔ synchronization object relation
// is many-to-many. A g can be on many wait lists, so there may be
// many sudogs for one g; and many gs may be waiting on the same
// synchronization object, so there may be many sudogs for one object.
//
// sudogs are allocated from a special pool. Use acquireSudog and
// releaseSudog to allocate and free them.
type sudog struct {
// The following fields are protected by the hchan.lock of the
// channel this sudog is blocking on. shrinkstack depends on
// this.

g *g
selectdone *uint32 // CAS to 1 to win select race (may point to stack)
next *sudog
prev *sudog
elem unsafe.Pointer // data element (may point to stack)

// The following fields are never accessed concurrently.
// waitlink is only accessed by g.

acquiretime int64
releasetime int64
ticket uint32
waitlink *sudog // g.waiting list
c *hchan // channel
}

可以看到,hchan主要包含

  • 环形队列,用来存放消息(只针对带缓冲channel)。涉及的字段有

    • qcount 当前队列中的消息数量

    • dataqsiz 队列大小

    • buf 队列具体数据buf

    • sendxrecvx 环形队列的发送和接收游标

  • 元素信息 elemtypeelemsize。其中elemtype 的类型 _type 是 Go 中表示变量类型的基础结构,在反射和interface 的实现中很常用。因为 channel 可以持有各种类型的数据,所以需要维护元素信息。
  • lock,用来做并发互斥。
  • 等待接收或等待发送的 goroutine 队列。recvqsendq 都是用双向链表实现的队列,如果有 goroutine 因为读写被阻塞,就会被调度器挂起在这两个队列上。我们之前分析 Go 调度模型GMP的时候有提到,一般 G (goroutine) 和 P (proc) 挂钩,处于等待执行或者正在执行状态。如果正在执行的 G 阻塞在 channel 上,就会脱离 P,转而挂起在 channel 的recvq 或者 sendq 上,完成 channel 操作后再选择合适的 P 来继续执行。
  • closed。用来表示channel是否关闭。关闭 channel 可用用来实现广播的效果,后面会讲到。
  • sudog 代表一个goroutine

创建channel

创建channel就是如何新建一个 hchan 结构的过程,源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
func makechan(t *chantype, size int64) *hchan {
elem := t.elem

// compiler checks this but be safe.
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}
if size < 0 || int64(uintptr(size)) != size || (elem.size > 0 && uintptr(size) > (_MaxMem-hchanSize)/elem.size) {
panic(plainError("makechan: size out of range"))
}

var c *hchan
if elem.kind&kindNoPointers != 0 || size == 0 {
// Allocate memory in one call.
// Hchan does not contain pointers interesting for GC in this case:
// buf points into the same allocation, elemtype is persistent.
// SudoG's are referenced from their owning thread so they can't be collected.
// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
c = (*hchan)(mallocgc(hchanSize+uintptr(size)*elem.size, nil, true))
if size > 0 && elem.size != 0 {
c.buf = add(unsafe.Pointer(c), hchanSize)
} else {
// race detector uses this location for synchronization
// Also prevents us from pointing beyond the allocation (see issue 9401).
c.buf = unsafe.Pointer(c)
}
} else {
c = new(hchan)
c.buf = newarray(elem, int(size))
}
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)

if debugChan {
print("makechan: chan=", c, "; elemsize=", elem.size, "; elemalg=", elem.alg, "; dataqsiz=", size, "\n")
}
return c
}
  • 元素大小不能超过64k(1<<16)

  • channel的元素个数不能为负数且不能超过一定数量

  • 如果channel里存的是非指针的具体对象,则channel和具体存放的元素buf会一起分配,GC不会扫描这部分buf数据,因为buf相当于是channel的一部分。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    if elem.kind&kindNoPointers != 0 || size == 0 {
    // Allocate memory in one call.
    // Hchan does not contain pointers interesting for GC in this case:
    // buf points into the same allocation, elemtype is persistent.
    // SudoG's are referenced from their owning thread so they can't be collected.
    // TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
    c = (*hchan)(mallocgc(hchanSize+uintptr(size)*elem.size, nil, true))
    if size > 0 && elem.size != 0 {
    c.buf = add(unsafe.Pointer(c), hchanSize)
    } else {
    // race detector uses this location for synchronization
    // Also prevents us from pointing beyond the allocation (see issue 9401).
    c.buf = unsafe.Pointer(c)
    }
    }
  • 否则 buf 分开分配内存,可能会被GC 回收到。

    1
    2
    3
    4
    } else {
    c = new(hchan)
    c.buf = newarray(elem, int(size))
    }

创建完成后的结构,利用 gdb 调试时看到 hchan 的数据

1
2
3
4
5
6
7
8
(gdb) p c
$1 = (struct runtime.hchan *) 0xc420072000
(gdb) p *c
$2 = {qcount = 0, dataqsiz = 10, buf = 0xc420072060, elemsize = 8, closed = 0, elemtype = 0x1097040 <type.*+56064>, sendx = 0, recvx = 0, recvq = {first = 0x0, last = 0x0},
sendq = {first = 0x0, last = 0x0}, lock = {key = 0}}
(gdb) p *c.elemtype
$3 = {size = 8, ptrdata = 0, hash = 4149441018, tflag = 7 '\a', align = 8 '\b', fieldalign = 8 '\b', kind = 130 '\202', alg = 0x110bbf0 <runtime.algarray+80>,
gcdata = 0x10bca3b <runtime.gcbits.*> "\001\002\003\004\005\006\a\b\n\f\r\016\017\020\022\025\026\030\031\032\033\036\037,568<ABUXr~\236\325\330\365\377\001\002\037\003%\004I\022U\001U\005U\025UUu\002y\001\224\a\230\a\230\177\330\003\340?\376\005\376!\377\377\001\016\034", str = 955, ptrToThis = 37088}

可以看到 c 的内存地址和 buf 的内存地址是连续的。

如果创建的是指针类型 channel, c 的地址就和 buf 地址是分开的

1
2
3
4
5
(gdb) p c
$1 = (struct runtime.hchan *) 0xc420068060
(gdb) p *c
$2 = {qcount = 0, dataqsiz = 10, buf = 0xc420016140, elemsize = 8, closed = 0, elemtype = 0x1092500 <type.*+37088>, sendx = 0, recvx = 0, recvq = {first = 0x0, last = 0x0},
sendq = {first = 0x0, last = 0x0}, lock = {key = 0}}

写channel

写对应的操作是 ch <- x, 编译器编译后实际调用了以下源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
// entry point for c <- x from compiled code
//go:nosplit
func chansend1(t *chantype, c *hchan, elem unsafe.Pointer) {
chansend(t, c, elem, true, getcallerpc(unsafe.Pointer(&t)))
}
/*
* generic single channel send/recv
* If block is not nil,
* then the protocol will not
* sleep but return if it could
* not complete.
*
* sleep can wake up with g.param == nil
* when a channel involved in the sleep has
* been closed. it is easiest to loop and re-run
* the operation; we'll see that it's now closed.
*/
func chansend(t *chantype, c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
if raceenabled {
raceReadObjectPC(t.elem, ep, callerpc, funcPC(chansend))
}
if msanenabled {
msanread(ep, t.elem.size)
}

if c == nil {
if !block {
return false
}
gopark(nil, nil, "chan send (nil chan)", traceEvGoStop, 2)
throw("unreachable")
}

if debugChan {
print("chansend: chan=", c, "\n")
}

if raceenabled {
racereadpc(unsafe.Pointer(c), callerpc, funcPC(chansend))
}

// Fast path: check for failed non-blocking operation without acquiring the lock.
//
// After observing that the channel is not closed, we observe that the channel is
// not ready for sending. Each of these observations is a single word-sized read
// (first c.closed and second c.recvq.first or c.qcount depending on kind of channel).
// Because a closed channel cannot transition from 'ready for sending' to
// 'not ready for sending', even if the channel is closed between the two observations,
// they imply a moment between the two when the channel was both not yet closed
// and not ready for sending. We behave as if we observed the channel at that moment,
// and report that the send cannot proceed.
//
// It is okay if the reads are reordered here: if we observe that the channel is not
// ready for sending and then observe that it is not closed, that implies that the
// channel wasn't closed during the first observation.
if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
(c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
return false
}

var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}

lock(&c.lock)

if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}

if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
send(c, sg, ep, func() { unlock(&c.lock) })
return true
}

if c.qcount < c.dataqsiz {
// Space is available in the channel buffer. Enqueue the element to send.
qp := chanbuf(c, c.sendx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}

if !block {
unlock(&c.lock)
return false
}

// Block on the channel. Some receiver will complete our operation for us.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.selectdone = nil
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)
goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3)

// someone woke us up.
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
if gp.param == nil {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
releaseSudog(mysg)
return true
}
nil channel
1
2
3
4
5
6
7
if c == nil {
if !block {
return false
}
gopark(nil, nil, "chan send (nil chan)", traceEvGoStop, 2)
throw("unreachable")
}

对于nil channel, 如果是非阻塞写,则会立即返回;如果是阻塞写,则当前 goroutine 会通过 gopark调用进入等待状态。

closed channel

写已经closed的channel会panic

1
2
3
4
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
recvq

如果 recvq 不为空,则表明有 goroutine 被挂起等待数据,此时即使是带缓冲的channel,也不会将数据存储到buf里,而是直接发给队列头的 goroutine。

1
2
3
4
5
6
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
send(c, sg, ep, func() { unlock(&c.lock) })
return true
}

数据会从发送 goroutine 的栈上(ep)直接被拷贝到挂起的 sudog 的 数据域 elem 上,然后通过 goready 唤醒这个sudog

1
2
3
4
5
6
7
8
9
10
11
if sg.elem != nil {
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, 4)
buf not full

如果没有等待接收者,且缓冲没满,则将数据拷贝到缓冲区中,这里就是简单的环形缓冲的操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
if c.qcount < c.dataqsiz {
// Space is available in the channel buffer. Enqueue the element to send.
qp := chanbuf(c, c.sendx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}

通过 chanbuf 定位到具体数据 qp,然后将 ep 指向的数据拷贝到其中,同时更新游标 sendx 和 元素数量 qcount

buf full

如果 buf 满了,对于非阻塞发送,就立即返回

1
2
3
4
if !block {
unlock(&c.lock)
return false
}

对于阻塞发送,当前发送 goroutine 就被阻塞住挂起在等待队列里

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// Block on the channel. Some receiver will complete our operation for us.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.selectdone = nil
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)
goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3)

待发送的数据 ep 被放置在 sudog 的 elem 上,sudog 本身入队到 sendq 中,并挂起。

读channel

读对应的操作是 x = <- ch, 编译器编译后实际调用了以下源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
// chanrecv receives on channel c and writes the received data to ep.
// ep may be nil, in which case received data is ignored.
// If block == false and no elements are available, returns (false, false).
// Otherwise, if c is closed, zeros *ep and returns (true, false).
// Otherwise, fills in *ep with an element and returns (true, true).
// A non-nil ep must point to the heap or the caller's stack.
func chanrecv(t *chantype, c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// raceenabled: don't need to check ep, as it is always on the stack
// or is new memory allocated by reflect.

if debugChan {
print("chanrecv: chan=", c, "\n")
}

if c == nil {
if !block {
return
}
gopark(nil, nil, "chan receive (nil chan)", traceEvGoStop, 2)
throw("unreachable")
}

// Fast path: check for failed non-blocking operation without acquiring the lock.
//
// After observing that the channel is not ready for receiving, we observe that the
// channel is not closed. Each of these observations is a single word-sized read
// (first c.sendq.first or c.qcount, and second c.closed).
// Because a channel cannot be reopened, the later observation of the channel
// being not closed implies that it was also not closed at the moment of the
// first observation. We behave as if we observed the channel at that moment
// and report that the receive cannot proceed.
//
// The order of operations is important here: reversing the operations can lead to
// incorrect behavior when racing with a close.
if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
atomic.Load(&c.closed) == 0 {
return
}

var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}

lock(&c.lock)

if c.closed != 0 && c.qcount == 0 {
if raceenabled {
raceacquire(unsafe.Pointer(c))
}
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}

if sg := c.sendq.dequeue(); sg != nil {
// Found a waiting sender. If buffer is size 0, receive value
// directly from sender. Otherwise, receive from head of queue
// and add sender's value to the tail of the queue (both map to
// the same buffer slot because the queue is full).
recv(c, sg, ep, func() { unlock(&c.lock) })
return true, true
}

if c.qcount > 0 {
// Receive directly from queue
qp := chanbuf(c, c.recvx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}

if !block {
unlock(&c.lock)
return false, false
}

// no sender available: block on this channel.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.selectdone = nil
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv, 3)

// someone woke us up
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
closed := gp.param == nil
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, !closed
}

ep 代表变量 x 指向的内存区,用来存储接收到的数据,如果 ep 为 nil,则读到的数据会被忽略

nil channel

对于未初始化的 channel,如果非阻塞读,则直接返回;如果是阻塞读,会通过 gopark 将当前 goroutine 转为等待状态。

1
2
3
4
5
6
7
if c == nil {
if !block {
return
}
gopark(nil, nil, "chan receive (nil chan)", traceEvGoStop, 2)
throw("unreachable")
}
closed channel
1
2
3
4
5
6
7
8
9
10
if c.closed != 0 && c.qcount == 0 {
if raceenabled {
raceacquire(unsafe.Pointer(c))
}
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}

对于已经关闭的 channel,如果 buf 中没有数据了,则返回元素类型的零值 (通过 typedmemclr 来处理)

如果 buf 中还有数据,还是会继续走后续流程读取数据 (从挂起的 sender 中或者 buf 中)。例如

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
ch := make(chan int, 5)

ch <- 1
ch <- 2
ch <- 3
ch <- 4
ch <- 5
close(ch)

//关闭后,只要还有数据,会读到数据;也会返回标识告知channel是否被关闭
data, isClosed := <-ch
log.Printf("data:%d isClosed:%v", data, isClosed)
log.Printf("after closed data:%d %d", <-ch, <-ch)
for data := range ch {
log.Printf("range data:%d", data)
}

//数据读完后,继续读取会返回零值
data = <- ch
log.Printf("continue read: data %d", data)

输出以下内容

1
2
3
4
5
data:1 isClosed:true
after closed data:2 3
range data:4
range data:5
continue read: data 0
sendq

如果 sendq 有被挂起等待发送数据的 goroutine,则获取队头的 goroutine,调用 recv 进行处理

1
2
3
4
5
6
7
8
if sg := c.sendq.dequeue(); sg != nil {
// Found a waiting sender. If buffer is size 0, receive value
// directly from sender. Otherwise, receive from head of queue
// and add sender's value to the tail of the queue (both map to
// the same buffer slot because the queue is full).
recv(c, sg, ep, func() { unlock(&c.lock) })
return true, true
}

recv 做的具体工作我们继续看下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
if c.dataqsiz == 0 {
if raceenabled {
racesync(c, sg)
}
if ep != nil {
// copy data from sender
recvDirect(c.elemtype, sg, ep)
}
} else {
// Queue is full. Take the item at the
// head of the queue. Make the sender enqueue
// its item at the tail of the queue. Since the
// queue is full, those are both the same slot.
qp := chanbuf(c, c.recvx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
raceacquireg(sg.g, qp)
racereleaseg(sg.g, qp)
}
// copy data from queue to receiver
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// copy data from sender to queue
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
sg.elem = nil
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, 4)

如果 channel 是非缓冲的 (dataqsiz == 0),则接收者直接从发送者手里接收数据

1
2
3
4
5
6
7
8
9
if c.dataqsiz == 0 {
if raceenabled {
racesync(c, sg)
}
if ep != nil {
// copy data from sender
recvDirect(c.elemtype, sg, ep)
} // else 忽略数据
}

如果 channel 是带缓冲的 (dataqsiz > 0),则表明此时 buf 已经满了,此时才会有等待发送的 goroutine 被挂起。需要做的事情就是从环形队列中读取一个元素,这时会空出一个元素的位置,之前挂起等待的队头 sudog 的数据就被写入环形队列。如果还有其他阻塞等待写的 sugog,继续挂起等待后续的读。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
} else {
// Queue is full. Take the item at the
// head of the queue. Make the sender enqueue
// its item at the tail of the queue. Since the
// queue is full, those are both the same slot.
qp := chanbuf(c, c.recvx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
raceacquireg(sg.g, qp)
racereleaseg(sg.g, qp)
}
// copy data from queue to receiver
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// copy data from sender to queue
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}

处理完成之后,之前 sendq 中挂起的发送 goroutine 就被 goready 调用唤醒

1
2
3
4
5
6
7
8
sg.elem = nil
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, 4)
buf not empty

如果 buf 有数据,则直接将数据读出到 ep 内存处,然后将其置为零值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
if c.qcount > 0 {
// Receive directly from queue
qp := chanbuf(c, c.recvx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
if ep != nil {
//数据从qp读取到ep中
typedmemmove(c.elemtype, ep, qp)
}
//置为零值
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
buf empty

如果 buf 中没有数据,对于非阻塞读取,立即返回

1
2
3
4
if !block {
unlock(&c.lock)
return false, false
}

对于阻塞读,则当前读 goroutine 被挂起到 recvq 中等待后续数据写

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// no sender available: block on this channel.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.selectdone = nil
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv, 3)

select channel

select 用于多个channel监听并收发消息,当任何一个case满足条件则会执行,若没有可执行的case,就会执行default,如果没有default,程序就会阻塞。select 的作用很类似于 IO多路复用。

多 channel select

前面提到,典型的 select 用法是用在多个 channel 上,例如

1
2
3
4
5
6
7
8
9
10
ch := make(chan int, 5)
chs := make(chan string, 5)
select {
case msg := <- ch:
fmt.Println("received msg: ", msg)
case msgs := <- chs:
fmt.Println("receied msgs: ", msgs)
default:
fmt.Println("no message received")
}

select 的结构为 hselect ,其定义如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// Select statement header.
// Known to compiler.
// Changes here must also be made in src/cmd/internal/gc/select.go's selecttype.
type hselect struct {
tcase uint16 // total count of scase[]
ncase uint16 // currently filled scase[]
pollorder *uint16 // case poll order
lockorder *uint16 // channel lock order
scase [1]scase // one per case (in order of appearance)
}

// Select case descriptor.
// Known to compiler.
// Changes here must also be made in src/cmd/internal/gc/select.go's selecttype.
type scase struct {
elem unsafe.Pointer // data element
c *hchan // chan
pc uintptr // return pc
kind uint16
so uint16 // vararg of selected bool
receivedp *bool // pointer to received bool (recv2)
releasetime int64
}

对其操作的具体源码实现是

1
selectgoImpl(sel *hselect) (uintptr, uint16)

select 的多个选项会被包装成多个scase 结构,然后依据 lockorder 来处获得所有锁

1
2
// lock all the channels involved in the select
sellock(scases, lockorder)

sellock 是实现为

1
2
3
4
5
6
7
8
9
10
func sellock(scases []scase, lockorder []uint16) {
var c *hchan
for _, o := range lockorder {
c0 := scases[o].c
if c0 != nil && c0 != c {
c = c0
lock(&c.lock)
}
}
}

锁会有去重判断,方式多个case 操作一个 channel 导致重复上锁问题。

然后依次查看所有的case是否有对应的事件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
for i := 0; i < int(sel.ncase); i++ {
//按照pollorder顺序来遍历
cas = &scases[pollorder[i]]
c = cas.c

switch cas.kind {
case caseRecv:
sg = c.sendq.dequeue()
if sg != nil {
goto recv
}
if c.qcount > 0 {
goto bufrecv
}
if c.closed != 0 {
goto rclose
}

case caseSend:
if raceenabled {
racereadpc(unsafe.Pointer(c), cas.pc, chansendpc)
}
if c.closed != 0 {
goto sclose
}
sg = c.recvq.dequeue()
if sg != nil {
goto send
}
if c.qcount < c.dataqsiz {
goto bufsend
}

case caseDefault:
dfl = cas
}
}

注意这里遍历的时候并不是按照代码里 case 的顺序,而是按照 pollorder 来的,这个 pollorder 是随机出来的顺序,因此如果有多个满足条件的 case,则最终选中的 case 是哪一个是随机的。这样能避免一直选中写在前面的 case 而导致其他 case 饿死 的情况。pollorder 通过以下代码随机洗牌而来

1
2
3
4
5
6
7
8
// generate permuted order
pollslice := slice{unsafe.Pointer(sel.pollorder), int(sel.ncase), int(sel.ncase)}
pollorder := *(*[]uint16)(unsafe.Pointer(&pollslice))
for i := 1; i < int(sel.ncase); i++ {
j := int(fastrand()) % (i + 1)
pollorder[i] = pollorder[j]
pollorder[j] = uint16(i)
}

如果过程中遍历到的 case 有一个是非阻塞的操作,则 select 会立即返回,不会再去检查后续的 case 是否 ready;如果每个 case 对其 channel 的操作都是阻塞的且没有 default 分支,则 select 会一直阻塞,而且会挂起在涉及的所有的 channel 的 recvq 或者 sendq 上:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// pass 2 - enqueue on all chans
gp = getg()
done = 0
if gp.waiting != nil {
throw("gp.waiting != nil")
}
nextp = &gp.waiting
for _, casei := range lockorder {
cas = &scases[casei]
c = cas.c
sg := acquireSudog()
sg.g = gp
// Note: selectdone is adjusted for stack copies in stack1.go:adjustsudogs
sg.selectdone = (*uint32)(noescape(unsafe.Pointer(&done)))
// No stack splits between assigning elem and enqueuing
// sg on gp.waiting where copystack can find it.
sg.elem = cas.elem
sg.releasetime = 0
if t0 != 0 {
sg.releasetime = -1
}
sg.c = c
// Construct waiting list in lock order.
*nextp = sg
nextp = &sg.waitlink

switch cas.kind {
case caseRecv:
c.recvq.enqueue(sg)

case caseSend:
c.sendq.enqueue(sg)
}
}

// wait for someone to wake us up
gp.param = nil
gopark(selparkcommit, nil, "select", traceEvGoBlockSelect, 2)
单 channel 非阻塞select

单 channel 的非阻塞 select 和前面多 channel 的select的实现机制不太一样,相对更简单,编译器会将其编译成 chanrecv 或者 chansend 调用

编译器会将以下非阻塞读取

1
2
3
4
5
6
select {
case c <- v:
... foo
default:
... bar
}

编译成

1
2
3
4
5
if selectnbsend(c, v) {
... foo
} else {
... bar
}

其中 非阻塞写的实现还是调用前面提到的 chansend

1
2
3
func selectnbsend(t *chantype, c *hchan, elem unsafe.Pointer) (selected bool) {
return chansend(t, c, elem, false, getcallerpc(unsafe.Pointer(&t)))
}

编译器会将以下非阻塞写

1
2
3
4
5
6
select {
case v = <-c:
... foo
default:
... bar
}

编译成

1
2
3
4
5
if selectnbrecv(&v, c) {
... foo
} else {
... bar
}

其中 非阻塞读的实现还是调用前面提到的 chanrecv

1
2
3
4
func selectnbrecv(t *chantype, elem unsafe.Pointer, c *hchan) (selected bool) {
selected, _ = chanrecv(t, c, elem, false)
return
}

close channel

close channel 的操作对应源码中的 closechan 调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
func closechan(c *hchan) {
if c == nil {
panic(plainError("close of nil channel"))
}

lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}

if raceenabled {
callerpc := getcallerpc(unsafe.Pointer(&c))
racewritepc(unsafe.Pointer(c), callerpc, funcPC(closechan))
racerelease(unsafe.Pointer(c))
}

c.closed = 1

var glist *g

// release all readers
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, unsafe.Pointer(c))
}
gp.schedlink.set(glist)
glist = gp
}

// release all writers (they will panic)
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, unsafe.Pointer(c))
}
gp.schedlink.set(glist)
glist = gp
}
unlock(&c.lock)

// Ready all Gs now that we've dropped the channel lock.
for glist != nil {
gp := glist
glist = glist.schedlink.ptr()
gp.schedlink = 0
goready(gp, 3)
}
}
nil channel

如果关闭一个未初始化的 channel,会 panic

1
2
3
if c == nil {
panic(plainError("close of nil channel"))
}
closed channel
1
2
3
4
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}

关闭一个已经关闭的 channel 也会导致 panic。所以一般使用时,是由唯一的一个生产者来关闭 channel。

正常关闭

正常关闭 channel 时,首先设置 closed 标识,然后将循环遍历,将 recvq 和 sendq 中挂起等待的 goroutine 收集到 glist 链表中等待调度

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
c.closed = 1

var glist *g

// release all readers
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, unsafe.Pointer(c))
}
gp.schedlink.set(glist)
glist = gp
}

// release all writers (they will panic)
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, unsafe.Pointer(c))
}
gp.schedlink.set(glist)
glist = gp
}

然后调度 glist 中的 goroutine

1
2
3
4
5
6
7
8
// Ready all Gs now that we've dropped the channel lock.
for glist != nil {
gp := glist
glist = glist.schedlink.ptr()
gp.schedlink = 0
// 使 g 的状态切换到 Grunnable,交给调度器调度
goready(gp, 3)
}

具体的:

  • 等待读的 goroutine,会将数据区 elem 置为零值 ( typedmemclr(c.elemtype, sg.elem) ) 然后继续执行
  • 等待写的 goroutine,将会 panic

参考资料

Go Channel 源码剖析

Diving Deep Into The Golang Channels.