特别提醒,本文所涉及的源码是go1.21.0 darwin/amd64
文件位置:runtime/trace/chan.go
1 基本数据结构
1.1hchan
Go
语言的channel
在运行时使用runtime.hchan
结构体来表示,如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| type hchan struct { qcount uint dataqsiz uint buf unsafe.Pointer elemsize uint16 closed uint32 elemtype *_type sendx uint recvx uint recvq waitq sendq waitq
lock mutex }
|
1.2 waitq
1 2 3 4 5
| type waitq struct { first *sudog last *sudog }
|
1.3 sudog
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| type sudog struct { g *g next *sudog prev *sudog elem unsafe.Pointer acquiretime int64 releasetime int64 ticket uint32 isSelect bool success bool parent *sudog waitlink *sudog waittail *sudog c *hchan }
|
直接看数据结构的定义,可能会有点不理解,下面通过图来辅助理解

图片来源:https://halfrost.com/go_channel/#toc-6
其中sendx
可能指向的是5
边上那块还没有数据的区域,即如果执行 ch <- data$,那么data
将被存储到这块区域。
同理recvx
指向的是0
,如果执行<-ch
首先会得到 0,然后recvx
会指向下一个
如果buf
中存放的元素已经达到容量,此时还有g
在执行发送操作给ch
的话,即ch<-data
则会被添加到sendq
队列中去,直到通道中的元素不再是满的。
同理buf
是空的, 此时还有g
在执行接收操作的话,即<-ch
则会被添加到recvq
队列中去,直到通道中有元素。
2 创建 channel
创建channel
的函数有两个,原型如下
1 2
| func makechan64(t *chantype, size int64) *hchan func makechan(t *chantype, size int) *hchan
|
makechan64
方法只是判断一下传入的size
是否在int32
范围内
1 2 3 4 5 6 7
| func makechan64(t *chantype, size int64) *hchan { if int64(int(size)) != size { panic(plainError("makechan: size out of range")) }
return makechan(t, int(size)) }
|
主要的实现还是在makechan
中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| func makechan(t *chantype, size int) *hchan { elem := t.Elem
if elem.Size_ >= 1<<16 { throw("makechan: invalid channel element type") } if hchanSize%maxAlign != 0 || elem.Align_ > maxAlign { throw("makechan: bad alignment") } mem, overflow := math.MulUintptr(elem.Size_, uintptr(size)) if overflow || mem > maxAlloc-hchanSize || size < 0 { panic(plainError("makechan: size out of range")) } var c *hchan switch { case mem == 0: c = (*hchan)(mallocgc(hchanSize, nil, true)) c.buf = c.raceaddr() case elem.PtrBytes == 0: c = (*hchan)(mallocgc(hchanSize+mem, nil, true)) c.buf = add(unsafe.Pointer(c), hchanSize) default: c = new(hchan) c.buf = mallocgc(mem, elem, true) } c.elemsize = uint16(elem.Size_) c.elemtype = elem c.dataqsiz = uint(size) lockInit(&c.lock, lockRankHchan)
if debugChan { print("makechan: chan=", c, "; elemsize=", elem.Size_, "; dataqsiz=", size, "\n") } return c }
|
3 发送数据
发送数据的操作有两个函数chansend1()
和chansend()
,不过实现逻辑都在后者,所以直接看它怎么实现的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { if c == nil { if !block { return false } gopark(nil, nil, waitReasonChanSendNilChan, traceBlockForever, 2) throw("unreachable") } ... lock(&c.lock)
if c.closed != 0 { unlock(&c.lock) panic(plainError("send on closed channel")) } ... }
|
- 首先对
channel
进行检查,如果已经被GC
回收,往一个nil
的通道发送数据会发送阻塞。gopark
会引发以 waitReasonChanSendNilChan 为原因的休眠,并抛出一个unreachable
的错误。
- 其次向已经关闭的
channel
发送数据时会报”send on closed channel”的错误
下面将发送过程分为三种情况
3.1 写时存在阻塞的读协程——直接发送
1 2 3 4
| if sg := c.recvq.dequeue(); sg != nil { send(c, sg, ep, func() { unlock(&c.lock) }, 3) return true }
|
如果存在阻塞的读协程时,会直接从recvq
中取出最先陷入等待(遵循 FIFO 原则)的Goroutine
并直接向它发送数据,绕过缓冲区

发送时的具体的执行逻辑由send()
函数来完成:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { if sg.elem != nil { sendDirect(c.elemtype, sg, ep) sg.elem = nil } gp := sg.g unlockf() gp.param = unsafe.Pointer(sg) sg.success = true if sg.releasetime != 0 { sg.releasetime = cputicks() } goready(gp, skip+1) }
|
send()
函数主要完成了 2 件事情:
- 调用
sendDirect()
函数将发送的数据直接拷贝到x = <-ch
表达式中变量x
所在的内存地址上,使用的是memmove()
函数。
- 调用
goready()
将等待接收的g
的状态从Gwaiting
或者Gscanwaiting
变成Grunnable
,并把该g
放到runnext
中,处理器在下一次调度时会立即运行它,具体逻辑可以看runqput()
函数的代码。需要注意是,发送数据的过程并没有立即执行接收发的g
,只是将g
放到runnext
中,下一次调度的时候再执行。
3.2 channel 带有缓冲区,空间还未满并且不存在阻塞的读协程
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| if c.qcount < c.dataqsiz { qp := chanbuf(c, c.sendx) if raceenabled { racenotify(c, c.sendx, nil) } typedmemmove(c.elemtype, qp, ep) c.sendx++ if c.sendx == c.dataqsiz { c.sendx = 0 } c.qcount++ unlock(&c.lock) return true }
|
如果 qcount
还没有满,则调用chanbuf()
获取 sendx
索引的元素指针值。调用 typedmemmove()
方法将发送的值拷贝到缓冲区 buf
中。拷贝完成,需要维护 sendx
索引下标值和qcount
个数。这里将 buf
缓冲区设计成环形的,索引值如果到了队尾,下一个位置重新回到队头。

图片来源:https://halfrost.com/go_channel/#toc-10
3.3 channel 带有缓冲区,空间已满并且不存在阻塞的读协程
这时向channel
发送数据会进入阻塞状态
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } mysg.elem = ep mysg.waitlink = nil mysg.g = gp mysg.isSelect = false mysg.c = c gp.waiting = mysg gp.param = nil c.sendq.enqueue(mysg) gp.parkingOnChan.Store(true) gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceBlockChanSend, 2) KeepAlive(ep)
|

图片来源:https://halfrost.com/go_channel/#toc-10
4 接收数据
接收数据的实现是在chanrecv()
函数下实现的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| if c == nil { if !block { return } gopark(nil, nil, waitReasonChanReceiveNilChan, traceBlockForever, 2) throw("unreachable") }
if c.closed != 0 { if c.qcount == 0 { if raceenabled { raceacquire(c.raceaddr()) } unlock(&c.lock) if ep != nil { typedmemclr(c.elemtype, ep) } return true, false } }
|
4.1 读时存在阻塞的写协程
1 2 3 4
| if sg := c.sendq.dequeue(); sg != nil { recv(c, sg, ep, func() { unlock(&c.lock) }, 3) return true, true }
|
- 从阻塞的
sendq
中获取头部的g
- 如果缓冲区大小为 0,则直接读取写协程元素,并唤醒它,这种情况只会发生一次
copy
操作即将阻塞的g
所保存的元素复制到read

- 如果有缓冲区,则读取头部元素,并将处于等待发送的
g
的元素写入缓冲区的尾部(即刚刚读取的位置,因为是环形的),这种情况会发送两次copy
操作,先将缓冲区recvx
处的元素拷贝到read
,然后将阻塞的g
的元素拷贝到sendx
所处的地址。


图片来源:https://halfrost.com/go_channel/#toc-16
4.2 读取时无阻塞的写协程且缓冲区有元素
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| if c.qcount > 0 { qp := chanbuf(c, c.recvx) if raceenabled { racenotify(c, c.recvx, nil) } if ep != nil { typedmemmove(c.elemtype, ep, qp) } typedmemclr(c.elemtype, qp) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.qcount-- unlock(&c.lock) return true, true }
|
直接将recvx
处的元素拷贝给read
,然后将对应的属性操作,qcount
减 1,recvx
++,如果等于容量,则归 0,可以理解为取模操作(recvx++)%qcount

4.3 读时没有阻塞的写协程且缓冲区无元素
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } mysg.elem = ep mysg.waitlink = nil gp.waiting = mysg mysg.g = gp mysg.isSelect = false mysg.c = c gp.param = nil c.recvq.enqueue(mysg) gp.parkingOnChan.Store(true) gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceBlockChanRecv, 2)
|


图片来源https://halfrost.com/go_channel/#toc-16
5 关闭 Channel
关闭通道的实现在closechan()
函数中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| if c == nil { panic(plainError("close of nil channel")) }
lock(&c.lock) if c.closed != 0 { unlock(&c.lock) panic(plainError("close of closed channel")) }
if raceenabled { callerpc := getcallerpc() racewritepc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(closechan)) racerelease(c.raceaddr()) }
c.closed = 1
|
当关闭channel
是nil
或者是一个已经关闭的channel
时,会直接panic
,当不存在这两种情况的时候,标记channel
状态为close
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| var glist gList
for { sg := c.recvq.dequeue() if sg == nil { break } if sg.elem != nil { typedmemclr(c.elemtype, sg.elem) sg.elem = nil } if sg.releasetime != 0 { sg.releasetime = cputicks() } gp := sg.g gp.param = unsafe.Pointer(sg) sg.success = false if raceenabled { raceacquireg(gp, c.raceaddr()) } glist.push(gp) }
|
将recvq
队列中的sudog
加入到待清除的队列glist
中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| for { sg := c.sendq.dequeue() if sg == nil { break } sg.elem = nil if sg.releasetime != 0 { sg.releasetime = cputicks() } gp := sg.g gp.param = unsafe.Pointer(sg) sg.success = false if raceenabled { raceacquireg(gp, c.raceaddr()) } glist.push(gp) } unlock(&c.lock)
|
将sendq
队列中的sudog
加入到待清除的队列glist
中,这里可能会产生panic

图片来源:https://halfrost.com/go_channel/#toc-16
1 2 3 4 5
| for !glist.empty() { gp := glist.pop() gp.schedlink = 0 goready(gp, 3) }
|
最后会为所有被阻塞的 g
调用goready
触发调度。将所有glist
中的g
状态从_Gwaiting
设置为 _Grunnable
状态,等待调度器的调度。
6 状态总结
chan state |
nil |
open |
closed |
send |
blocked |
allowed |
panic |
receive |
blocked |
allowed |
allowed |
Reference