特别提醒,本文所涉及的源码是go1.21.0 darwin/amd64

文件位置:runtime/trace/chan.go

1 基本数据结构

1.1hchan

Go语言的channel在运行时使用runtime.hchan结构体来表示,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
type hchan struct {
qcount uint // 已经存放的元素个数
dataqsiz uint // 容量
buf unsafe.Pointer // 存放元素的环形缓冲区
elemsize uint16 // 元素类型的大小
closed uint32 // channel是否关闭
elemtype *_type // 元素类型
sendx uint // channel发送操作处理的位置
recvx uint // channel接收操作处理的位置
recvq waitq // 因接收而陷入阻塞的协程队列,即( <-ch )
sendq waitq // 因发送而陷入阻塞的协程队列,即( ch<- )

lock mutex
}

1.2 waitq

1
2
3
4
5
// 阻塞的协程队列,是一个双向链表
type waitq struct {
first *sudog // 表头
last *sudog // 表尾
}

1.3 sudog

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
type sudog struct {
g *g
next *sudog
prev *sudog
elem unsafe.Pointer // 指向数据元素 (可能指向栈)
acquiretime int64
releasetime int64
ticket uint32
isSelect bool
success bool
parent *sudog // semaRoot 二叉树
waitlink *sudog // g.waiting list or semaRoot
waittail *sudog // semaRoot
c *hchan // channel
}
  • sudog:对goroutine的封装,表示一个在等待队列中的g

  • g:协程

  • next:链表的下一个节点
  • prev:链表的前一个节点
  • elem:指向数据
  • isSelect:表示g是否被选择
  • success:表示channelc 上的通信是否成功

直接看数据结构的定义,可能会有点不理解,下面通过图来辅助理解

image-20230825143137015

图片来源:https://halfrost.com/go_channel/#toc-6

其中sendx可能指向的是5边上那块还没有数据的区域,即如果执行 ch <- data$,那么data将被存储到这块区域。

同理recvx指向的是0,如果执行<-ch首先会得到 0,然后recvx会指向下一个

如果buf中存放的元素已经达到容量,此时还有g在执行发送操作给ch的话,即ch<-data则会被添加到sendq队列中去,直到通道中的元素不再是满的。

同理buf是空的, 此时还有g在执行接收操作的话,即<-ch则会被添加到recvq队列中去,直到通道中有元素。

2 创建 channel

创建channel的函数有两个,原型如下

1
2
func makechan64(t *chantype, size int64) *hchan
func makechan(t *chantype, size int) *hchan

makechan64方法只是判断一下传入的size是否在int32范围内

1
2
3
4
5
6
7
func makechan64(t *chantype, size int64) *hchan {
if int64(int(size)) != size {
panic(plainError("makechan: size out of range"))
}

return makechan(t, int(size))
}

主要的实现还是在makechan

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
func makechan(t *chantype, size int) *hchan {
elem := t.Elem

// 编译器检查单个元素的大小不能超过64KB
if elem.Size_ >= 1<<16 {
throw("makechan: invalid channel element type")
}
// 检查是否对齐
if hchanSize%maxAlign != 0 || elem.Align_ > maxAlign {
throw("makechan: bad alignment")
}
// 缓冲区大小检查,判断是否溢出
mem, overflow := math.MulUintptr(elem.Size_, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
var c *hchan
switch {
case mem == 0:
// 队列或者元素大小为0时
c = (*hchan)(mallocgc(hchanSize, nil, true))
// Rac 竞争检查使用这个地址进行同步操作
c.buf = c.raceaddr()
case elem.PtrBytes == 0:
// 元素不包含指针时,一次性分配hchan和buf的内存
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// 元素包含指针时,则分别申请hchan和buf的空间,两者无需连续
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
// 将其余字段赋初始值
c.elemsize = uint16(elem.Size_)
c.elemtype = elem
c.dataqsiz = uint(size)
lockInit(&c.lock, lockRankHchan)

if debugChan {
print("makechan: chan=", c, "; elemsize=", elem.Size_, "; dataqsiz=", size, "\n")
}
return c
}

3 发送数据

发送数据的操作有两个函数chansend1()chansend(),不过实现逻辑都在后者,所以直接看它怎么实现的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// 判断 channel 是否为nil
if c == nil {
if !block {
return false
}
gopark(nil, nil, waitReasonChanSendNilChan, traceBlockForever, 2)
throw("unreachable")
}
...
lock(&c.lock)

if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
...
}
  • 首先对channel进行检查,如果已经被GC回收,往一个nil的通道发送数据会发送阻塞。gopark会引发以 waitReasonChanSendNilChan 为原因的休眠,并抛出一个unreachable的错误。
  • 其次向已经关闭的channel发送数据时会报”send on closed channel”的错误

下面将发送过程分为三种情况

3.1 写时存在阻塞的读协程——直接发送

1
2
3
4
if sg := c.recvq.dequeue(); sg != nil {
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}

如果存在阻塞的读协程时,会直接从recvq中取出最先陷入等待(遵循 FIFO 原则)的Goroutine并直接向它发送数据,绕过缓冲区

image-20230825220928571

发送时的具体的执行逻辑由send()函数来完成:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if sg.elem != nil {
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, skip+1)
}

send()函数主要完成了 2 件事情:

  1. 调用sendDirect()函数将发送的数据直接拷贝到x = <-ch表达式中变量x所在的内存地址上,使用的是memmove()函数。
  2. 调用goready()将等待接收的g的状态从Gwaiting或者Gscanwaiting变成Grunnable,并把该g放到runnext中,处理器在下一次调度时会立即运行它,具体逻辑可以看runqput()函数的代码。需要注意是,发送数据的过程并没有立即执行接收发的g,只是将g放到runnext中,下一次调度的时候再执行。

3.2 channel 带有缓冲区,空间还未满并且不存在阻塞的读协程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
if c.qcount < c.dataqsiz {
qp := chanbuf(c, c.sendx)
if raceenabled {
racenotify(c, c.sendx, nil)
}
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}

如果 qcount还没有满,则调用chanbuf() 获取 sendx 索引的元素指针值。调用 typedmemmove() 方法将发送的值拷贝到缓冲区 buf 中。拷贝完成,需要维护 sendx索引下标值和qcount 个数。这里将 buf 缓冲区设计成环形的,索引值如果到了队尾,下一个位置重新回到队头。

image-20230826133707785

图片来源:https://halfrost.com/go_channel/#toc-10

3.3 channel 带有缓冲区,空间已满并且不存在阻塞的读协程

这时向channel发送数据会进入阻塞状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)
gp.parkingOnChan.Store(true)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceBlockChanSend, 2)
KeepAlive(ep)
  • 调用getg()方法获取当前g的指针,绑定到一个sudog
  • 调用acquireSudog()方法获取一个sudog

  • 调用c.sendq.enqueue()方法将配置好的sudog加入队列

  • 调用gopark()挂起当前g,状态为waitReasonChanSend,阻塞等待channel发送
  • 调用KeepAlive()保持活动状态,等待其它协程取走元素

image-20230826135153228

图片来源:https://halfrost.com/go_channel/#toc-10

4 接收数据

接收数据的实现是在chanrecv()函数下实现的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 从一个为nil的通道中接收数据会发生异常
if c == nil {
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceBlockForever, 2)
throw("unreachable")
}
// 通道已经关闭且不存在缓存数据也会异常
if c.closed != 0 {
if c.qcount == 0 {
if raceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
}

4.1 读时存在阻塞的写协程

1
2
3
4
if sg := c.sendq.dequeue(); sg != nil {
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
  • 从阻塞的sendq中获取头部的g
  • 如果缓冲区大小为 0,则直接读取写协程元素,并唤醒它,这种情况只会发生一次copy操作即将阻塞的g所保存的元素复制到read

image-20230826151104338

  • 如果有缓冲区,则读取头部元素,并将处于等待发送的g的元素写入缓冲区的尾部(即刚刚读取的位置,因为是环形的),这种情况会发送两次copy操作,先将缓冲区recvx处的元素拷贝到read,然后将阻塞的g的元素拷贝到sendx所处的地址。

image-20230826150504865

image-20230826151355877

图片来源:https://halfrost.com/go_channel/#toc-16

4.2 读取时无阻塞的写协程且缓冲区有元素

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
if c.qcount > 0 {
qp := chanbuf(c, c.recvx)
if raceenabled {
racenotify(c, c.recvx, nil)
}
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}

直接将recvx处的元素拷贝给read,然后将对应的属性操作,qcount减 1,recvx++,如果等于容量,则归 0,可以理解为取模操作(recvx++)%qcount

image-20230826151746532

4.3 读时没有阻塞的写协程且缓冲区无元素

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
gp.parkingOnChan.Store(true)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceBlockChanRecv, 2)
  • 调用getg()方法获取当前g的指针,绑定到一个sudog
  • 调用acquireSudog()方法获取一个sudog

  • 调用c.recvq.enqueue()方法将配置好的sudog加入队列

  • 调用gopark()挂起当前g,状态为waitReasonChanReceive,阻塞等待channel接收

image-20230826152412808

image-20230826152442123

图片来源https://halfrost.com/go_channel/#toc-16

5 关闭 Channel

关闭通道的实现在closechan()函数中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
if c == nil {
panic(plainError("close of nil channel"))
}

lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}

if raceenabled {
callerpc := getcallerpc()
racewritepc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(closechan))
racerelease(c.raceaddr())
}

c.closed = 1

当关闭channelnil或者是一个已经关闭的channel时,会直接panic,当不存在这两种情况的时候,标记channel状态为close

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
var glist gList
// release all readers
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.success = false
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}

recvq队列中的sudog加入到待清除的队列glist

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// release all writers (they will panic)
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.success = false
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
unlock(&c.lock)

sendq队列中的sudog加入到待清除的队列glist中,这里可能会产生panic

image-20230826153540944

图片来源:https://halfrost.com/go_channel/#toc-16

1
2
3
4
5
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}

最后会为所有被阻塞的 g 调用goready 触发调度。将所有glist中的g 状态从_Gwaiting设置为 _Grunnable状态,等待调度器的调度。

6 状态总结

chan state nil open closed
send blocked allowed panic
receive blocked allowed allowed

Reference