channel的基本概念
1. channel的基本概念
channel,通道。golang中用于数据传递的一种数据结构, 是golang中一种传递数据的方式,常用于goroutine之间的通信。 像管道一样,一个goroutineA向channelA中放数据,另外一个goroutineB从channelA中取数据。
2. channel的申明、传值、关闭
channel是指针类型的数据类型,可通过make来分配内存。例如:ch := make(chan int)
,这表示创建一个channel,这个channel中只能保存int类型的数据。也就是说一端只能向此channel中放进int类型的值,另一端只能从此channel中读出int类型的值。
使用chan关键字声明一个通道,在使用前必须先创建,操作符 <-
用于指定通道的方向,发送或接收。如果未指定方向,则为双向通道。
1 | //声明和创建 |
3. buffer channel和unbuffer channel
channel分为两种:unbuffered channel和buffered channel
unbuffered channel:阻塞、同步模式
- sender端向channel中send一个数据,然后阻塞,直到receiver端将此数据receive
- receiver端一直阻塞,直到sender端向channel发送了一个数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19package main
import (
"fmt"
)
var done chan bool
func HelloWorld() {
fmt.Println("Hello world goroutine")
done <- true
}
func main() {
done = make(chan bool) // 创建一个channel
go HelloWorld()
<-done
}
// output "hello world goroutine"buffered channel:非阻塞、异步模式
- sender端可以向channel中send多个数据(只要channel容量未满),容量满之前不会阻塞
- receiver端按照队列的方式(FIFO,先进先出)从buffered channel中按序receive其中数据
1
2
3ch := make(chan string, 3) // 创建了缓冲区为3的通道
fmt.Println(len(ch))
fmt.Println(cap(ch))
4. channel foreach(遍历)
可以通过for无限循环来读取channel中的数据,但是可以使用range来迭代channel, 它会返回每次迭代过程中所读取的数据,直到channel被关闭。
注意: 只要channel未关闭,range迭代channel就会一直被阻塞。
1 | channel := make(chan int, 10) |
5. channel死锁
把数据往通道中发送时,如果接收方一直都没有接收,那么发送操作将持续阻塞。Go 程序运行时能智能地发现一些永远无法发送成功的语句并报错, 例如以下情形将将出现deadlock:
只要所有goroutine都被阻塞(包括main函数),就会出现死锁
1 | fatal error: all goroutines are asleep - deadlock! |
实例一:
1 | package main |
执行这段代码后将会出现如下错误:
1 | fatal error: all goroutines are asleep - deadlock! |
出现deadlock的原因是因为ch<-"hello world"
这行导致了main goroutine被挂起,导致fmt.Println(<ch)
这一行得不到执行,且没有任何其他goroutine去消费这个channel, 因此造成了deadlock。
示例二:
1 | package main |
执行完这段代码后,也会出现死锁的情况如下:
1 | hello |
出现这样的结果是因为通道实际上是类型化消息的队列,它是先进先出(FIFO
)的结构,可以保证发送给它们的元素的顺序。所以上面代码只取出了第一次传的值,即”hello”,而第二次传入的值没有一个配对的接收者来接收,因此就出现了deadlock
。
示例三:
1 | package main |
执行完这段代码也会出现deadlock的情况如下:
1 | hello |
出现上面的结果是因为for
循环一直在获取通道中的值,但是在读取完 hello和
world后,通道中没有新的值传入,这样接收者就阻塞了。
channel原理解析
channel的结构如下所示(channel本质是消息传递的数据结构):
1 | type hchan struct { |
buf
:指向底层循环数组,只有缓冲型的 channel 才有。sendx
,recvx
:指向底层循环数组,表示当前可以发送和接收的元素位置索引值(相对于底层数组)sendq
,recvq
:表示被阻塞的 goroutine,这些 goroutine 由于尝试读取 channel 或向 channel 发送数据而被阻塞。lock
:用来保证每个读 channel 或写 channel 的操作都是原子的。
waitq是
sudog的一个双向链表,而
sudog` 实际上是对 goroutine 的一个封装:
1 | type waitq struct { |
例如,创建一个容量为 6 的,元素为 int 型的 channel 数据结构如下 :
创建channel
创建channel 在底层使用的是函数makechan
:
1 | func makechan(t *chantype, size int64) *hchan |
从函数原型来看,创建的 chan 是一个指针。所以我们能在函数间直接传递 channel,而不用传递 channel 的指针。
具体代码如下:
1 | const hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1)) |
说完channel的结构后,以一个例子说明channel如何接收和发送的
1 | func goroutineA(a <-chan int) { |
接收
channel的接收操作有两种写法,一种带 “ok”,反应 channel 是否关闭;一种不带 “ok”,这种写法,当接收到相应类型的零值时无法知道是真实的发送者发送过来的值,还是 channel 被关闭后,返回给接收者的默认类型的零值。两种写法,都有各自的应用场景。
1 | result := <-ch |
经过编译器的处理后,这两种写法最后对应源码里的这两个函数:
1 | // entry points for <- c from compiled code |
chanrecv1
函数处理不带 “ok” 的情形,chanrecv2
则通过返回 “received” 这个字段来反应 channel 是否被关闭。接收值则比较特殊,会“放到”参数 elem
所指向的地址了,这很像 C/C++ 里的写法。如果代码里忽略了接收值,这里的 elem 为 nil。
无论如何,最终转向了 chanrecv
函数:
1 | // 位于 src/runtime/chan.go |
- 如果 channel 是一个空值(nil),在非阻塞模式下,会直接返回。在阻塞模式下,会调用 gopark 函数挂起 goroutine,这个会一直阻塞下去。因为在 channel 是 nil 的情况下,要想不阻塞,只有关闭它,但关闭一个 nil 的 channel 又会发生 panic,所以没有机会被唤醒了。
- 和发送函数一样,接下来搞了一个在非阻塞模式下,不用获取锁,快速检测到失败并且返回的操作。顺带插一句,我们平时在写代码的时候,找到一些边界条件,快速返回,能让代码逻辑更清晰,因为接下来的正常情况就比较少,更聚焦了,看代码的人也更能专注地看核心代码逻辑了。
当我们观察到 channel 没准备好接收:
- 非缓冲型,等待发送列队里没有 goroutine 在等待
- 缓冲型,但 buf 里没有元素
之后,又观察到 closed == 0,即 channel 未关闭。
因为 channel 不可能被重复打开,所以前一个观测的时候, channel 也是未关闭的,因此在这种情况下可以直接宣布接收失败,快速返回。因为没被选中,也没接收到数据,所以返回值为 (false, false)。
- 接下来的操作,首先会上一把锁,粒度比较大。如果 channel 已关闭,并且循环数组 buf 里没有元素。对应非缓冲型关闭和缓冲型关闭但 buf 无元素的情况,返回对应类型的零值,但 received 标识是 false,告诉调用者此 channel 已关闭,你取出来的值并不是正常由发送者发送过来的数据。但是如果处于 select 语境下,这种情况是被选中了的。很多将 channel 用作通知信号的场景就是命中了这里。
- 接下来,如果有等待发送的队列,说明 channel 已经满了,要么是非缓冲型的 channel,要么是缓冲型的 channel,但 buf 满了。这两种情况下都可以正常接收数据。
于是,调用 recv 函数:
1 | func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { |
如果是非缓冲型的,就直接从发送者的栈拷贝到接收者的栈。否则,就是缓冲型 channel,而 buf 又满了的情形。说明发送游标和接收游标重合了,因此需要先找到接收游标:
1 | func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) { |
将该处的元素拷贝到接收地址。然后将发送者待发送的数据拷贝到接收游标处。这样就完成了接收数据和发送数据的操作。接着,分别将发送游标和接收游标向前进一,如果发生“环绕”,再从 0 开始。
最后,取出 sudog 里的 goroutine,调用 goready 将其状态改成 “runnable”,待发送者被唤醒,等待调度器的调度。
- 然后,如果 channel 的 buf 里还有数据,说明可以比较正常地接收。注意,这里,即使是在 channel 已经关闭的情况下,也是可以走到这里的。这一步比较简单,正常地将 buf 里接收游标处的数据拷贝到接收数据的地址。
- 到了最后一步,走到这里来的情形是要阻塞的。当然,如果 block 传进来的值是 false,那就不阻塞,直接返回就好了。
先构造一个 sudog,接着就是保存各种值了。注意,这里会将接收数据的地址存储到了 elem
字段,当被唤醒时,接收到的数据就会保存到这个字段指向的地址。然后将 sudog 添加到 channel 的 recvq 队列里。调用 goparkunlock 函数将 goroutine 挂起。
我们继续之前的例子。前面说到第 14 行,创建了一个非缓冲型的 channel,接着,第 15、16 行分别创建了一个 goroutine,各自执行了一个接收操作。通过前面的源码分析,我们知道,这两个 goroutine (后面称为 G1 和 G2 好了)都会被阻塞在接收操作。G1 和 G2 会挂在 channel 的 recq 队列中,形成一个双向循环链表。因此此时chan的状态如下:
G1 和 G2 被挂起了,状态是 WAITING
, goroutine 是用户态的协程,由 Go runtime 进行管理,作为对比,内核线程由 OS 进行管理。Goroutine 更轻量,因此我们可以轻松创建数万 goroutine。一个内核线程可以管理多个 goroutine,当其中一个 goroutine 阻塞时,内核线程可以调度其他的 goroutine 来运行,内核线程本身不会阻塞。这就是通常我们说的 M:N
模型:
M:N
模型通常由三部分构成:M、P、G。M 是内核线程,负责运行 goroutine;P 是 context,保存 goroutine 运行所需要的上下文,它还维护了可运行(runnable)的 goroutine 列表;G 则是待运行的 goroutine。M 和 P 是 G 运行的基础。
继续回到例子。假设我们只有一个 M,当 G1(go goroutineA(ch)
) 运行到 val := <- a
时,它由本来的 running 状态变成了 waiting 状态(调用了 gopark 之后的结果):
G1 脱离与 M 的关系,但调度器可不会让 M 闲着,所以会接着调度另一个 goroutine 来运行:
G2 也是同样的遭遇。现在 G1 和 G2 都被挂起了,等待着一个 sender 往 channel 里发送数据,才能得到解救。
发送
第 17 行向 channel 发送了一个元素 3。发送操作最终转化为 chansend
函数
1 | // 位于 src/runtime/chan.go |
上面的代码注释地比较详细了,我们来详细看看。
- 如果检测到 channel 是空的,当前 goroutine 会被挂起。
- 对于不阻塞的发送操作,如果 channel 未关闭并且没有多余的缓冲空间(说明:a. channel 是非缓冲型的,且等待接收队列里没有 goroutine;b. channel 是缓冲型的,但循环数组已经装满了元素)
如果检测到 channel 已经关闭,直接 panic。
如果能从等待接收队列 recvq 里出队一个 sudog(代表一个 goroutine),说明此时 channel 是空的,没有元素,所以才会有等待接收者。这时会调用 send 函数将元素直接从发送者的栈拷贝到接收者的栈,关键操作由 sendDirect
函数完成。
1 | // send 函数处理向一个空的 channel 发送操作 |
然后,解锁、唤醒接收者,等待调度器的光临,接收者也得以重见天日,可以继续执行接收操作之后的代码了。
在发送小节里我们说到 G1 和 G2 现在被挂起来了,等待 sender 的解救。在第 17 行,主协程向 ch 发送了一个元素 3,来看下接下来会发生什么。
根据前面源码分析的结果,我们知道,sender 发现 ch 的 recvq 里有 receiver 在等待着接收,就会出队一个 sudog,把 recvq 里 first 指针的 sudo “推举”出来了,并将其加入到 P 的可运行 goroutine 队列中。
然后,sender 把发送元素拷贝到 sudog 的 elem 地址处,最后会调用 goready 将 G1 唤醒,状态变为 runnable。
channel触发panic的三种情况
- 向一个关闭的 channel 进行写操作(读操作不会触发panic,会返回channel 类型的零值)
- 关闭一个 nil 的 channel
- 重复关闭一个 channel。
参考文档
- Go基础系列:channel入门
- Go Channel 详解 //鸟窝大佬
- go中通道channel的使用及原理 // 推荐阅读
- 深度解密Go语言之channel // 强烈推荐阅读绕全成大佬的所有文章
- golang runtime源码阅读 channal实现 // 这篇文章也挺不错的