特别提醒,本文所涉及的源码是go1.21.0 darwin/amd64
文件位置:runtime/trace/chan.go
1 基本数据结构
1.1hchan
Go语言的channel在运行时使用runtime.hchan结构体来表示,如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| type hchan struct { qcount uint dataqsiz uint buf unsafe.Pointer elemsize uint16 closed uint32 elemtype *_type sendx uint recvx uint recvq waitq sendq waitq
lock mutex }
|
1.2 waitq
1 2 3 4 5
| type waitq struct { first *sudog last *sudog }
|
1.3 sudog
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| type sudog struct { g *g next *sudog prev *sudog elem unsafe.Pointer acquiretime int64 releasetime int64 ticket uint32 isSelect bool success bool parent *sudog waitlink *sudog waittail *sudog c *hchan }
|
直接看数据结构的定义,可能会有点不理解,下面通过图来辅助理解

图片来源:https://halfrost.com/go_channel/#toc-6
其中sendx可能指向的是5边上那块还没有数据的区域,即如果执行 ch <- data$,那么data将被存储到这块区域。
同理recvx指向的是0,如果执行<-ch首先会得到 0,然后recvx会指向下一个
如果buf中存放的元素已经达到容量,此时还有g在执行发送操作给ch的话,即ch<-data则会被添加到sendq队列中去,直到通道中的元素不再是满的。
同理buf是空的, 此时还有g在执行接收操作的话,即<-ch则会被添加到recvq队列中去,直到通道中有元素。
2 创建 channel
创建channel的函数有两个,原型如下
1 2
| func makechan64(t *chantype, size int64) *hchan func makechan(t *chantype, size int) *hchan
|
makechan64方法只是判断一下传入的size是否在int32范围内
1 2 3 4 5 6 7
| func makechan64(t *chantype, size int64) *hchan { if int64(int(size)) != size { panic(plainError("makechan: size out of range")) }
return makechan(t, int(size)) }
|
主要的实现还是在makechan中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| func makechan(t *chantype, size int) *hchan { elem := t.Elem
if elem.Size_ >= 1<<16 { throw("makechan: invalid channel element type") } if hchanSize%maxAlign != 0 || elem.Align_ > maxAlign { throw("makechan: bad alignment") } mem, overflow := math.MulUintptr(elem.Size_, uintptr(size)) if overflow || mem > maxAlloc-hchanSize || size < 0 { panic(plainError("makechan: size out of range")) } var c *hchan switch { case mem == 0: c = (*hchan)(mallocgc(hchanSize, nil, true)) c.buf = c.raceaddr() case elem.PtrBytes == 0: c = (*hchan)(mallocgc(hchanSize+mem, nil, true)) c.buf = add(unsafe.Pointer(c), hchanSize) default: c = new(hchan) c.buf = mallocgc(mem, elem, true) } c.elemsize = uint16(elem.Size_) c.elemtype = elem c.dataqsiz = uint(size) lockInit(&c.lock, lockRankHchan)
if debugChan { print("makechan: chan=", c, "; elemsize=", elem.Size_, "; dataqsiz=", size, "\n") } return c }
|
3 发送数据
发送数据的操作有两个函数chansend1()和chansend(),不过实现逻辑都在后者,所以直接看它怎么实现的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { if c == nil { if !block { return false } gopark(nil, nil, waitReasonChanSendNilChan, traceBlockForever, 2) throw("unreachable") } ... lock(&c.lock)
if c.closed != 0 { unlock(&c.lock) panic(plainError("send on closed channel")) } ... }
|
- 首先对
channel进行检查,如果已经被GC回收,往一个nil的通道发送数据会发送阻塞。gopark会引发以 waitReasonChanSendNilChan 为原因的休眠,并抛出一个unreachable的错误。
- 其次向已经关闭的
channel发送数据时会报”send on closed channel”的错误
下面将发送过程分为三种情况
3.1 写时存在阻塞的读协程——直接发送
1 2 3 4
| if sg := c.recvq.dequeue(); sg != nil { send(c, sg, ep, func() { unlock(&c.lock) }, 3) return true }
|
如果存在阻塞的读协程时,会直接从recvq中取出最先陷入等待(遵循 FIFO 原则)的Goroutine并直接向它发送数据,绕过缓冲区

发送时的具体的执行逻辑由send()函数来完成:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { if sg.elem != nil { sendDirect(c.elemtype, sg, ep) sg.elem = nil } gp := sg.g unlockf() gp.param = unsafe.Pointer(sg) sg.success = true if sg.releasetime != 0 { sg.releasetime = cputicks() } goready(gp, skip+1) }
|
send()函数主要完成了 2 件事情:
- 调用
sendDirect()函数将发送的数据直接拷贝到x = <-ch表达式中变量x所在的内存地址上,使用的是memmove()函数。
- 调用
goready()将等待接收的g的状态从Gwaiting或者Gscanwaiting变成Grunnable,并把该g放到runnext中,处理器在下一次调度时会立即运行它,具体逻辑可以看runqput()函数的代码。需要注意是,发送数据的过程并没有立即执行接收发的g,只是将g放到runnext中,下一次调度的时候再执行。
3.2 channel 带有缓冲区,空间还未满并且不存在阻塞的读协程
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| if c.qcount < c.dataqsiz { qp := chanbuf(c, c.sendx) if raceenabled { racenotify(c, c.sendx, nil) } typedmemmove(c.elemtype, qp, ep) c.sendx++ if c.sendx == c.dataqsiz { c.sendx = 0 } c.qcount++ unlock(&c.lock) return true }
|
如果 qcount还没有满,则调用chanbuf() 获取 sendx 索引的元素指针值。调用 typedmemmove() 方法将发送的值拷贝到缓冲区 buf 中。拷贝完成,需要维护 sendx索引下标值和qcount 个数。这里将 buf 缓冲区设计成环形的,索引值如果到了队尾,下一个位置重新回到队头。

图片来源:https://halfrost.com/go_channel/#toc-10
3.3 channel 带有缓冲区,空间已满并且不存在阻塞的读协程
这时向channel发送数据会进入阻塞状态
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } mysg.elem = ep mysg.waitlink = nil mysg.g = gp mysg.isSelect = false mysg.c = c gp.waiting = mysg gp.param = nil c.sendq.enqueue(mysg) gp.parkingOnChan.Store(true) gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceBlockChanSend, 2) KeepAlive(ep)
|

图片来源:https://halfrost.com/go_channel/#toc-10
4 接收数据
接收数据的实现是在chanrecv()函数下实现的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| if c == nil { if !block { return } gopark(nil, nil, waitReasonChanReceiveNilChan, traceBlockForever, 2) throw("unreachable") }
if c.closed != 0 { if c.qcount == 0 { if raceenabled { raceacquire(c.raceaddr()) } unlock(&c.lock) if ep != nil { typedmemclr(c.elemtype, ep) } return true, false } }
|
4.1 读时存在阻塞的写协程
1 2 3 4
| if sg := c.sendq.dequeue(); sg != nil { recv(c, sg, ep, func() { unlock(&c.lock) }, 3) return true, true }
|
- 从阻塞的
sendq中获取头部的g
- 如果缓冲区大小为 0,则直接读取写协程元素,并唤醒它,这种情况只会发生一次
copy操作即将阻塞的g所保存的元素复制到read

- 如果有缓冲区,则读取头部元素,并将处于等待发送的
g的元素写入缓冲区的尾部(即刚刚读取的位置,因为是环形的),这种情况会发送两次copy操作,先将缓冲区recvx处的元素拷贝到read,然后将阻塞的g的元素拷贝到sendx所处的地址。


图片来源:https://halfrost.com/go_channel/#toc-16
4.2 读取时无阻塞的写协程且缓冲区有元素
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| if c.qcount > 0 { qp := chanbuf(c, c.recvx) if raceenabled { racenotify(c, c.recvx, nil) } if ep != nil { typedmemmove(c.elemtype, ep, qp) } typedmemclr(c.elemtype, qp) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.qcount-- unlock(&c.lock) return true, true }
|
直接将recvx处的元素拷贝给read,然后将对应的属性操作,qcount减 1,recvx++,如果等于容量,则归 0,可以理解为取模操作(recvx++)%qcount

4.3 读时没有阻塞的写协程且缓冲区无元素
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } mysg.elem = ep mysg.waitlink = nil gp.waiting = mysg mysg.g = gp mysg.isSelect = false mysg.c = c gp.param = nil c.recvq.enqueue(mysg) gp.parkingOnChan.Store(true) gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceBlockChanRecv, 2)
|


图片来源https://halfrost.com/go_channel/#toc-16
5 关闭 Channel
关闭通道的实现在closechan()函数中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| if c == nil { panic(plainError("close of nil channel")) }
lock(&c.lock) if c.closed != 0 { unlock(&c.lock) panic(plainError("close of closed channel")) }
if raceenabled { callerpc := getcallerpc() racewritepc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(closechan)) racerelease(c.raceaddr()) }
c.closed = 1
|
当关闭channel是nil或者是一个已经关闭的channel时,会直接panic,当不存在这两种情况的时候,标记channel状态为close
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| var glist gList
for { sg := c.recvq.dequeue() if sg == nil { break } if sg.elem != nil { typedmemclr(c.elemtype, sg.elem) sg.elem = nil } if sg.releasetime != 0 { sg.releasetime = cputicks() } gp := sg.g gp.param = unsafe.Pointer(sg) sg.success = false if raceenabled { raceacquireg(gp, c.raceaddr()) } glist.push(gp) }
|
将recvq队列中的sudog加入到待清除的队列glist中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| for { sg := c.sendq.dequeue() if sg == nil { break } sg.elem = nil if sg.releasetime != 0 { sg.releasetime = cputicks() } gp := sg.g gp.param = unsafe.Pointer(sg) sg.success = false if raceenabled { raceacquireg(gp, c.raceaddr()) } glist.push(gp) } unlock(&c.lock)
|
将sendq队列中的sudog加入到待清除的队列glist中,这里可能会产生panic

图片来源:https://halfrost.com/go_channel/#toc-16
1 2 3 4 5
| for !glist.empty() { gp := glist.pop() gp.schedlink = 0 goready(gp, 3) }
|
最后会为所有被阻塞的 g 调用goready 触发调度。将所有glist中的g 状态从_Gwaiting设置为 _Grunnable状态,等待调度器的调度。
6 状态总结
| chan state |
nil |
open |
closed |
| send |
blocked |
allowed |
panic |
| receive |
blocked |
allowed |
allowed |
Reference