Go Channel 浅析 —— 基本操作的底层实现

CSP

Go Channel 采用 CSP 模型,CSP(通信顺序进程,Communicating Sequential Processes)是一种用于描述并发系统的模型。它由英国计算机科学家 Tony Hoare 在1978年提出。CSP 提出一种通过消息传递(而非共享内存)进行进程间通信的方式,这种方式使并发编程变得更为直观且易于理解。Go 语言的并发原语 goroutine 和 channel 就是受到了 CSP 模型的启发。

数据结构

// channel 数据结构
type hchan struct {
    qcount   uint           // 当前队列中剩余元素个数
    dataqsiz uint           // 环形队列长度,即可以存放的元素个数
    buf      unsafe.Pointer // 环形队列指针
    elemsize uint16         // 每个元素的大小
    closed   uint32         // 标识关闭状态
    timer    *timer         // channel 关联的 timer
    elemtype *_type         // 元素类型
    sendx    uint           // 队列下标,指示元素写入时存放到队列中的位置
    recvx    uint           // 队列下标,指示元素从队列的该位置读出
    recvq    waitq          // 等待读消息的goroutine队列
    sendq    waitq          // 等待写消息的goroutine队列
    lock mutex              // 互斥锁,chan不允许并发读写
}

// waitq 存储了分别指向队头和队尾两个 runtime.sudog 的指针以构成链表
type waitq struct {
  first *sudog
  last  *sudog
}

基础操作

创建 Channel

Go 语言中所有 Channel 的创建都会使用 make 关键字。编译器会将 make(chan int, 10) 表达式转换成 OMAKE 类型的节点,并在类型检查阶段将 OMAKE 类型的节点转换成 OMAKECHAN 类型。这一阶段会对传入的缓冲区大小进行检查,如果我们不传递表示缓冲区大小的参数,那么就会设置一个默认值 0,也就是当前的 Channel 不存在缓冲区。OMAKECHAN 类型的节点最终都会在 SSA 中间代码生成阶段之前被转换成直接或间接调用 runtime.makechan 函数。

func makechan(t *chantype, size int) *hchan {
  elem := t.Elem

  // 部分前置检查
  ...
  
  // 计算所需内存空间
  mem, overflow := math.MulUintptr(elem.Size_, uintptr(size))
  if overflow || mem > maxAlloc-hchanSize || size < 0 {
    panic(plainError("makechan: size out of range"))
  }
  
  var c *hchan
  switch {
  case mem == 0:
   // Queue or element size is zero.
   c = (*hchan)(mallocgc(hchanSize, nil, true))
   // Race detector uses this location for synchronization.
   c.buf = c.raceaddr()
  case !elem.Pointers():
   // Elements do not contain pointers.
   // Allocate hchan and buf in one call.
   c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
   c.buf = add(unsafe.Pointer(c), hchanSize)
  default:
   // Elements contain pointers.
   c = new(hchan)
   c.buf = mallocgc(mem, elem, true)
  }

  c.elemsize = uint16(elem.Size_)
  c.elemtype = elem
  c.dataqsiz = uint(size)
  lockInit(&c.lock, lockRankHchan)

  ...
  
  return c
}

makechan 会根据传入的类型和缓冲区长度来决定如何对内存空间进行初始化:

  • 如果当前传入的 Channel 缓冲区长度为 0,或类型占用空间为 0,那么就只会为 runtime.hchan 分配一段内存空间;
  • 如果当前 Channel 中存储的类型不是指针类型,会为当前的 Channel 和底层的数组分配一块连续的内存空间;
  • 非上述两种情况会单独为 runtime.hchan 和缓冲区分配内存;

发送数据

程序代码中通过 ch <- elem 语句向 Channel 发送数据,编译器会将该原语解析成 OSEND 类型的节点,并最终转换成调用 chansend 函数。

  1. 检查 Channel 是否已经关闭,向已关闭的 Channel 发送数据会引发 Panic:
    func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
      ...
      if c.closed != 0 {
        unlock(&c.lock)
        panic(plainError("send on closed channel"))
      }
      ...
    }
    
  2. 检查是否有处于等待队列中的消费者,如有则直接发送数据:
    func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
      ...
      if sg := c.recvq.dequeue(); sg != nil {
      // Found a waiting receiver. We pass the value we want to send
      // directly to the receiver, bypassing the channel buffer (if any).
        send(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true
      }
    }
    
  3. 检查当前缓冲区的元素个数是否小于缓冲区大小,如果仍有空间,则将元素写入缓冲区:
    func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
      ...
      if c.qcount < c.dataqsiz {
        // Space is available in the channel buffer. Enqueue the element to send.
        qp := chanbuf(c, c.sendx)
        if raceenabled {
          racenotify(c, c.sendx, nil)
        }
        typedmemmove(c.elemtype, qp, ep)
        c.sendx++
        if c.sendx == c.dataqsiz {
          c.sendx = 0
        }
        c.qcount++
        unlock(&c.lock)
        return true
      }
      ...
    }
    
  4. 非阻塞发送,直接返回:
    func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
      ...
      if !block {
        unlock(&c.lock)
        return false, false
      }
      ...
    }
    
  5. 不满足上述条件,Goroutine 将被休眠直至被唤醒:
    func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
      ...
      // Block on the channel. Some receiver will complete our operation for us.
      gp := getg()
      mysg := acquireSudog()
      mysg.releasetime = 0
      if t0 != 0 {
        mysg.releasetime = -1
      }
      // No stack splits between assigning elem and enqueuing mysg
      // on gp.waiting where copystack can find it.
      mysg.elem = ep
      mysg.waitlink = nil
      mysg.g = gp
      mysg.isSelect = false
      mysg.c = c
      gp.waiting = mysg
      gp.param = nil
      c.sendq.enqueue(mysg)
      // Signal to anyone trying to shrink our stack that we're about
      // to park on a channel. The window between when this G's status
      // changes and when we set gp.activeStackChans is not safe for
      // stack shrinking.
      gp.parkingOnChan.Store(true)
      gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceBlockChanSend, 2)
      ...
    }
    

接收数据

程序代码中通过 elem := <-ch 语句从 Channel 接收数据,编译器会将该原语解析成 ORECV 类型的节点,并最终转换成调用 chanrecv 函数。

  1. nil Channel 接收数据,非阻塞情况下直接返回,阻塞情况下会直接调用 runtime.gopark 让出执行权,进入休眠。
    func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
      ...
      if c == nil {
       if !block {
        return
       }
       gopark(nil, nil, waitReasonChanReceiveNilChan, traceBlockForever, 2)
       throw("unreachable")
      }
    }
    
  2. 当 channel 为非阻塞模式且 channel 为空时,会进行以下检查,且全程为无锁操作:
    1. 检查 channel 是否已经关闭。如果 channel 没有关闭,则直接返回,表示接收操作不能继续。
    2. 如果 channel 已经关闭,则再次检查 channel 是否为空。如果 channel 为空,则表示 channel 已经关闭且没有任何待接收的数据,会清空接收缓冲区并返回 true, false,表示接收操作不能继续
    func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
      ...
      // Fast path: check for failed non-blocking operation without acquiring the lock.
      if !block && empty(c) {
       // After observing that the channel is not ready for receiving, we observe whether the
       // channel is closed.
       //
       // Reordering of these checks could lead to incorrect behavior when racing with a close.
       // For example, if the channel was open and not empty, was closed, and then drained,
       // reordered reads could incorrectly indicate "open and empty". To prevent reordering,
       // we use atomic loads for both checks, and rely on emptying and closing to happen in
       // separate critical sections under the same lock.  This assumption fails when closing
       // an unbuffered channel with a blocked send, but that is an error condition anyway.
       if atomic.Load(&c.closed) == 0 {
        // Because a channel cannot be reopened, the later observation of the channel
        // being not closed implies that it was also not closed at the moment of the
        // first observation. We behave as if we observed the channel at that moment
        // and report that the receive cannot proceed.
        return
       }
       // The channel is irreversibly closed. Re-check whether the channel has any pending data
       // to receive, which could have arrived between the empty and closed checks above.
       // Sequential consistency is also required here, when racing with such a send.
       if empty(c) {
        // The channel is irreversibly closed and empty.
        if raceenabled {
         raceacquire(c.raceaddr())
        }
        if ep != nil {
         typedmemclr(c.elemtype, ep)
        }
        return true, false
       }
      }
    }
    
  3. 如果当前 Channel 已经被关闭并且缓冲区中不存在任何数据,那么会清除 ep 指针中的数据并立刻返回。
    func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
      ...
      if c.closed != 0 {
        if c.qcount == 0 {
          if raceenabled {
            raceacquire(c.raceaddr())
          }
          unlock(&c.lock)
          if ep != nil {
            typedmemclr(c.elemtype, ep)
          }
          return true, false
        }
        // The channel has been closed, but the channel's buffer have data.
      }
      ...
    }
    
  4. 如果 Channel 未关闭且仍有等待的发送者,则:
    • 当存在等待的发送者时,通过 runtime.recv 从阻塞的发送者或者缓冲区中获取数据;
    • 如果 Channel 缓冲区大小为 0:调用 runtime.recvDirect 将 Channel 发送队列中 Goroutine 存储的 elem 数据拷贝到目标内存地址中;
    • 如果 Channel 缓冲区大小不为 0:则将队头中的数据拷贝到接收方的内存地址,将发送队列头的数据拷贝到缓冲区中,同时唤醒该发送方先前休眠的 Goroutine;
    func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
      ...
      if c.closed != 0 {
        ...
      } else {
        // Just found waiting sender with not closed.
        if sg := c.sendq.dequeue(); sg != nil {
        // Found a waiting sender. If buffer is size 0, receive value
        // directly from sender. Otherwise, receive from head of queue
        // and add sender's value to the tail of the queue (both map to
        // the same buffer slot because the queue is full).
          recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
          return true, true
        }
      }
      ...
    }
    
    
    func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
      // 缓冲区大小为 0
      // 调用 recvDirect 直接接收
      if c.dataqsiz == 0 {
       if raceenabled {
        racesync(c, sg)
       }
       if ep != nil {
        // copy data from sender
        recvDirect(c.elemtype, sg, ep)
       }
      } else {
       // 缓冲区大小不为 0
       // Queue is full. Take the item at the
       // head of the queue. Make the sender enqueue
       // its item at the tail of the queue. Since the
       // queue is full, those are both the same slot.
       qp := chanbuf(c, c.recvx)
       if raceenabled {
        racenotify(c, c.recvx, nil)
        racenotify(c, c.recvx, sg)
       }
       // 从缓冲区拷贝队头数据给接收者
       if ep != nil {
        typedmemmove(c.elemtype, ep, qp)
       }
       // 从发送者拷贝数据到缓冲区队尾
       typedmemmove(c.elemtype, qp, sg.elem)
       c.recvx++
       if c.recvx == c.dataqsiz {
         c.recvx = 0
       }
       c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
      }
      sg.elem = nil
      gp := sg.g
      unlockf()
      gp.param = unsafe.Pointer(sg)
      sg.success = true
      if sg.releasetime != 0 {
       sg.releasetime = cputicks()
      }
      goready(gp, skip+1)
    }
    
  5. 如果 Channel 未关闭且发送等待队列中没有等待发送的发送者,但缓冲区有数据,则直接从缓冲区中获取数据。
    func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
      ...
      if c.qcount > 0 {
        // Receive directly from queue
        qp := chanbuf(c, c.recvx)
        if raceenabled {
          racenotify(c, c.recvx, nil)
        }
        if ep != nil {
          typedmemmove(c.elemtype, ep, qp)
        }
        typedmemclr(c.elemtype, qp)
        c.recvx++
        if c.recvx == c.dataqsiz {
          c.recvx = 0
        }
        c.qcount--
        unlock(&c.lock)
        return true, true
      }
      ...
    }
    
  6. 不满足上述所有情况下,非阻塞模式接收将直接返回,阻塞模式的接收者将被加入等待队列,进入休眠等待的状态,等待被新发送的消息唤醒。
    func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
      ...
      if !block {
        unlock(&c.lock)
        return false, false
      }
    
      // no sender available: block on this channel.
      gp := getg()
      mysg := acquireSudog()
      mysg.releasetime = 0
      if t0 != 0 {
        mysg.releasetime = -1
      }
      // No stack splits between assigning elem and enqueuing mysg
      // on gp.waiting where copystack can find it.
      mysg.elem = ep
      mysg.waitlink = nil
      gp.waiting = mysg
    
      mysg.g = gp
      mysg.isSelect = false
      mysg.c = c
      gp.param = nil
      c.recvq.enqueue(mysg)
      if c.timer != nil {
        blockTimerChan(c)
      }
    
      // Signal to anyone trying to shrink our stack that we're about
      // to park on a channel. The window between when this G's status
      // changes and when we set gp.activeStackChans is not safe for
      // stack shrinking.
      gp.parkingOnChan.Store(true)
      gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceBlockChanRecv, 2)
      ...
    }
    

关闭 Channel

程序代码中通过 close(ch) 语句从 Channel 接收数据,编译器会将该原语解析成 OCLOSE 类型的节点,并最终转换成调用 closechan 函数。

  1. 关闭 nil 或已关闭的 Channel,会引发 Panic,否则先将 Channel 标记为关闭
    func closechan(c *hchan) {
      if c == nil {
        panic(plainError("close of nil channel"))
      }
    
      lock(&c.lock)
      if c.closed != 0 {
        unlock(&c.lock)
        panic(plainError("close of closed channel"))
      }
      ...
      c.closed = 1
      ...
    }
    
  2. 释放所有处于等待状态的接收者,加入 gList
    func closechan(c *hchan) {
      ...
      var glist gList
    
      // release all readers
      for {
        sg := c.recvq.dequeue()
        if sg == nil {
          break
        }
        if sg.elem != nil {
          typedmemclr(c.elemtype, sg.elem)
          sg.elem = nil
        }
        if sg.releasetime != 0 {
          sg.releasetime = cputicks()
        }
        gp := sg.g
        gp.param = unsafe.Pointer(sg)
        sg.success = false
        if raceenabled {
          raceacquireg(gp, c.raceaddr())
        }
        glist.push(gp)
      }
      ...
    }
    
  3. 释放所有处于等待状态的发送者,并加入 gList。值得注意的是,由于在 chansend 函数中,发送方 goroutine 在被唤醒后检查发送状态和 Channel 关闭状态,在 Channel 关闭后未发送成功的 goroutine 会触发 Panic
    func closechan(c *hchan) {
      ...
      // release all writers (they will panic)
      for {
        sg := c.sendq.dequeue()
        if sg == nil {
          break
        }
        sg.elem = nil
        if sg.releasetime != 0 {
          sg.releasetime = cputicks()
        }
        gp := sg.g
        gp.param = unsafe.Pointer(sg)
        sg.success = false
        if raceenabled {
          raceacquireg(gp, c.raceaddr())
        }
        glist.push(gp)
      }
      ...
    }
    
    func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
      ...
      // someone woke us up.
      if mysg != gp.waiting {
        throw("G waiting list is corrupted")
      }
      gp.waiting = nil
      gp.activeStackChans = false
      closed := !mysg.success
      gp.param = nil
      if mysg.releasetime > 0 {
        blockevent(mysg.releasetime-t0, 2)
      }
      mysg.c = nil
      releaseSudog(mysg)
      if closed {
        if c.closed == 0 {
          throw("chansend: spurious wakeup")
        }
        panic(plainError("send on closed channel"))
      }
      return true
    }
    
  4. 将所有处于等待状态的 goroutine 设置为 ready 状态,等待被调度:
    func closechan(c *hchan) {
      ...
      // Ready all Gs now that we've dropped the channel lock.
      for !glist.empty() {
        gp := glist.pop()
        gp.schedlink = 0
        goready(gp, 3)
      }
    }