import ( "fmt" "sync" ) funcmain() { var wg sync.WaitGroup wg.Add(2) ach, bch := make(chanint), make(chanint)
// 消费者 goroutine gofunc(wg *sync.WaitGroup, a, b <-chanint) { defer wg.Done() var ( name string x int ok bool ) for { select { case x, ok = <-a: name = "a" case x, ok = <-b: name = "b" } if !ok { // 如果没有数据发送,则跳出循环 return } fmt.Println(name, x) } }(&wg, ach, bch)
// 生产者 goroutine gofunc(wg *sync.WaitGroup, a, b chan<- int) { defer wg.Done() deferclose(a) deferclose(b) for i := 0; i < 10; i++ { select { case a <- i: case b <- i * 10: } } }(&wg, ach, bch)
wg.Wait() }
上述代码,分别创建了生产者和消费者的 goroutine,生产者会随机从 a b channel 中随机挑选一个发送消息,而消费者使用一个 for 循环来监控 a b channel,当 a b 其中一个接收到数据时,则指定对应内容.如果没有数据,则跳出循环,结束 goroutine.
应用示例
设置超时时间
1 2 3 4 5 6 7 8 9 10 11 12 13
ch := make(chanstruct{})
// finish task while send msg to ch go doTask(ch)
// 程序会在 5 s 内超时自动退出 timeout := time.After(5 * time.Second) select { case <- ch: fmt.Println("task finished.") case <- timeout: fmt.Println("task timeout.") }
设置退出信号
1 2 3 4 5 6 7 8 9 10
msgCh := make(chanstruct{}) quitCh := make(chanstruct{}) for { select { case <- msgCh: doSomeWork() case <- quitCh: finish() return }
按指定顺序在 goroutine 中循环打印内容
如下是启动了 3 个 goroutine 按顺序分别打印不同的内容,每个 goroutine 打印 n 次的代码实现.
funcf(wg *sync.WaitGroup, counter, n int64, in <-chanbool, out chan<- bool, message string) { for { // if 必须在 <- in 之前进行判断,否则 return 时会发生死锁 if counter >= n { wg.Done() return// 别忘了 return } x := <-in // 使用 x 作为启动函数的标志,信号进入到此函数 fmt.Println(message) atomic.AddInt64(&counter, 1) // 原子操作,协程数据安全 out <- x // 信号离开此函数,并进入到下一个函数 } }
funcmain() { var ( dogCh = make(chanbool, 1) // 必须定义为缓冲区为 1 的 channel,用于保存启动信号/标志 catCh = make(chanbool, 1) fishCh = make(chanbool, 1) wg sync.WaitGroup n int64 = 3 counter int64 = 0 ) dogCh <- true//启动的信号 wg.Add(3) go f(&wg, 0, n, dogCh, catCh, "dog") go f(&wg, 0, n, catCh, fishCh, "cat") go f(&wg, 0, n, fishCh, dogCh, "fish") wg.Wait() }
type hchan struct { qcount uint// total data in the queue 队列中所有数据的总数 dataqsiz uint// size of the circular queue 环形队列的大小,由 make 初始化时的 size 决定 buf unsafe.Pointer // points to an array of dataqsiz elements 指向大小为 dataqsiz 数组的指针 elemsize uint16// 元素大小 closed uint32// 是否关闭 elemtype *_type // element type 元素数据类型,由 make 初始化时的 元素类型决定 sendx uint// send index 发送索引 recvx uint// receive index 接收索引 recvq waitq // list of recv waiters recv 等待列表,即 <-ch sendq waitq // list of send waiters send 等待列表,即 ch<-
// lock protects all fields in hchan, as well as several // fields in sudogs blocked on this channel. // // Do not change another G's status while holding this lock // (in particular, do not ready a G), as this can deadlock // with stack shrinking. // Trans: lock 保护了 hchan 的所有字段,以及在此 channel 上阻塞的 sudog 的一些字段 // 当持有此锁时不应改变其它 G 的状态,因为它在栈收缩时会发生死锁 lock mutex // 锁 }
// 可以理解为由封装了 goroutine 的 sudog 组成的双向环形链表, type waitq struct { first *sudog last *sudog }
// ${GOROOT}/src/runtime/runtime2.go // 双向环形链表的元素结构体,内部封装了 goroutine 的指针 type sudog struct { // The following fields are protected by the hchan.lock of the // channel this sudog is blocking on. shrinkstack depends on // this for sudogs involved in channel ops. g *g // goroutine 的指针 next *sudog // 链表的下一个节点 prev *sudog // 前一个节点 elem unsafe.Pointer // 数据元素 data element (may point to stack) // The following fields are never accessed concurrently. // For channels, waitlink is only accessed by g. // For semaphores, all fields (including the ones above) // are only accessed when holding a semaRoot lock. acquiretime int64 releasetime int64 ticket uint32 // isSelect indicates g is participating in a select, so // g.selectDone must be CAS'd to win the wake-up race. isSelect bool parent *sudog // semaRoot binary tree waitlink *sudog // g.waiting list or semaRoot waittail *sudog // semaRoot c *hchan // channel }
var c *hchan switch { case mem == 0: // 环形队列容量大小为 0, make(chan interface{}) c = (*hchan)(mallocgc(hchanSize, nil, true)) c.buf = c.raceaddr() case elem.ptrdata == 0: // Elements do not contain pointers. // Allocate hchan and buf in one call. // 元素不包含指针时,分配连续的内存 c = (*hchan)(mallocgc(hchanSize+mem, nil, true)) // 为 channel 和 环形队列分配连续的内存 c.buf = add(unsafe.Pointer(c), hchanSize) default: // Elements contain pointers. c = new(hchan) c.buf = mallocgc(mem, elem, true) }
// case2: 如果队列未满,则将其移动到队列中,并使 send 索引和队列中元素数加1 if c.qcount < c.dataqsiz { // Space is available in the channel buffer. Enqueue the element to send. qp := chanbuf(c, c.sendx) if raceenabled { raceacquire(qp) racerelease(qp) } typedmemmove(c.elemtype, qp, ep) c.sendx++ if c.sendx == c.dataqsiz { c.sendx = 0 } c.qcount++ unlock(&c.lock) returntrue }
if !block { unlock(&c.lock) returnfalse }
// case3: 缓存队列已满,将 goroutine 加入缓存队列
// Block on the channel. Some receiver will complete our operation for us. gp := getg() // 获取 sudog.这里点进去会发现,获取 sudog 的方式为 优先从全局拿,数量为本地缓存队列空间的一半.如果没有,则创建新的 sudog mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } // No stack splits between assigning elem and enqueuing mysg // on gp.waiting where copystack can find it. mysg.elem = ep mysg.waitlink = nil mysg.g = gp mysg.isSelect = false mysg.c = c gp.waiting = mysg gp.param = nil // 将 sudog 添加到 队列中 c.sendq.enqueue(mysg) // Signal to anyone trying to shrink our stack that we're about // to park on a channel. The window between when this G's status // changes and when we set gp.activeStackChans is not safe for // stack shrinking. atomic.Store8(&gp.parkingOnChan, 1) // 阻塞,等待重新被调度后继续从此位置开始执行 gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2) // Ensure the value being sent is kept alive until the // receiver copies it out. The sudog has a pointer to the // stack object, but sudogs aren't considered as roots of the // stack tracer. KeepAlive(ep)
// someone woke us up. if mysg != gp.waiting { throw("G waiting list is corrupted") } gp.waiting = nil gp.activeStackChans = false if gp.param == nil { if c.closed == 0 { throw("chansend: spurious wakeup") } panic(plainError("send on closed channel")) } gp.param = nil if mysg.releasetime > 0 { blockevent(mysg.releasetime-t0, 2) } mysg.c = nil // 释放 sudog releaseSudog(mysg) returntrue }
funcchanrecv(c *hchan, ep unsafe.Pointer, block bool)(selected, received bool) { // raceenabled: don't need to check ep, as it is always on the stack // or is new memory allocated by reflect.
if debugChan { print("chanrecv: chan=", c, "\n") }
// fastpath: 快速检测 channel 是否为 nil if c == nil { if !block { return } gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2) throw("unreachable") }
// Fast path: 快速检查失败的非阻塞操作 if !block && empty(c) { // channel 是否已经被关闭 if atomic.Load(&c.closed) == 0 { return } // 快速检查是否有数据 if empty(c) { // The channel is irreversibly closed and empty. if raceenabled { raceacquire(c.raceaddr()) } if ep != nil { typedmemclr(c.elemtype, ep) } returntrue, false } }
var t0 int64 if blockprofilerate > 0 { t0 = cputicks() }
// 加锁 lock(&c.lock)
// 再次进行检查 if c.closed != 0 && c.qcount == 0 { if raceenabled { raceacquire(c.raceaddr()) } unlock(&c.lock) if ep != nil { typedmemclr(c.elemtype, ep) } returntrue, false }
// case1: 如果 channel 的发送列中已经有 sender 的 goroutine 等待发送,则 // 当没有缓冲队列时(c.dataqsiz == 0),直接从等待的 goroutine 中接收数据 // 当缓冲队列已满,则从 channel 的缓冲区队列头部获取数据,并复制发送者数据到缓冲队列中,并使得 send/recv 索引自增. // 最终解锁并使得 sender 的 goroutine 处于 ready 状态 if sg := c.sendq.dequeue(); sg != nil { // Found a waiting sender. If buffer is size 0, receive value // directly from sender. Otherwise, receive from head of queue // and add sender's value to the tail of the queue (both map to // the same buffer slot because the queue is full). recv(c, sg, ep, func() { unlock(&c.lock) }, 3) returntrue, true }
// case2: channel 缓冲队列不为空,则直接从缓冲队列中获取数据,并使得 recv 索引自增,qcount 元素个数自减 if c.qcount > 0 { // Receive directly from queue qp := chanbuf(c, c.recvx) if raceenabled { raceacquire(qp) racerelease(qp) } if ep != nil { typedmemmove(c.elemtype, ep, qp) } typedmemclr(c.elemtype, qp) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.qcount-- unlock(&c.lock) returntrue, true }
if !block { unlock(&c.lock) returnfalse, false }
// no sender available: block on this channel. // case3: 缓存队列为空,将 receiver 的 goroutine 加入 recvq 队列,并阻塞.细节基本与发送数据的 case3 一致 gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } // No stack splits between assigning elem and enqueuing mysg // on gp.waiting where copystack can find it. mysg.elem = ep mysg.waitlink = nil gp.waiting = mysg mysg.g = gp mysg.isSelect = false mysg.c = c gp.param = nil c.recvq.enqueue(mysg) // Signal to anyone trying to shrink our stack that we're about // to park on a channel. The window between when this G's status // changes and when we set gp.activeStackChans is not safe for // stack shrinking. atomic.Store8(&gp.parkingOnChan, 1) gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
// someone woke us up if mysg != gp.waiting { throw("G waiting list is corrupted") } gp.waiting = nil gp.activeStackChans = false if mysg.releasetime > 0 { blockevent(mysg.releasetime-t0, 2) } closed := gp.param == nil gp.param = nil mysg.c = nil releaseSudog(mysg) returntrue, !closed }