所以,如果我正确地解释了你的程序,这个程序应该可以工作。
所以,有两个发件人:
- 发件人 1 发送:0、1、2、...
- 发件人 2 发送:11、12、13、...
发送方现在将其发送给接收方,接收方总共监听 20 个数字然后停止(指示发送方和接收方也停止)。
接收者有责任将从发送者 goroutine 接收到的数据中继到打印机。
有两台打印机:
- 打印机 1 接收奇数并打印出来
- 打印机 2 接收偶数并打印出来
sender (2 instance) -> mediator (1 instance) -> printer (2 instance)
package main
import (
"fmt"
"sync"
)
type packet struct {
send chan int
recvOdd chan int
recvEven chan int
closed chan struct{}
}
func safeClose(ch chan int) {
_, ok := <-ch
if !ok {
return
}
close(ch)
}
// sender spawns multiple goroutines and keeps on
// sending numbers continuously in serial order until
// p.closed is closed. The goroutines have their own
// starting point.
// Example start = [0, 11]
// sender will send it in this order:
// go -> 0, 1, 2, 3 ... // "go" keyword means goroutine
// go -> 11, 12, 13, ... // "go" keyword means goroutine
func (p *packet) sender(wg *sync.WaitGroup, start ...int) {
wg.Add(len(start))
for _, sp := range start {
// Spawn!
go func(s int, w *sync.WaitGroup) {
defer w.Done()
// Start from "s", keep on incrementing
for num := s; ; num++ {
select {
// I'm done. Return
case <-p.closed:
return
// Send
case p.send <- num:
}
}
}(sp, wg)
}
}
// mediator receives upto a limit and then
// signals to close down. It also judges
// if the number received is even/ odd and
// sends to the respective channel.
func (p *packet) mediator(limit int) {
for i := 1; i <= limit; i++ {
r := <-p.send
if r%2 == 0 {
p.recvEven <- r
} else {
p.recvOdd <- r
}
}
// close down the channels.
p.close()
}
// close all channels except the send channel
// as there's a possibility that that send's goroutines
// might send to the closed channel which would cause panic.
// So, let send close the p.send channel.
func (p *packet) close() {
close(p.closed)
close(p.recvOdd)
close(p.recvEven)
}
// printer receives from printer
// and then stores it and prints it.
func (p *packet) printer() {
var odd, even = make(chan []int), make(chan []int)
go recv(odd, p.recvOdd)
go recv(even, p.recvEven)
oBuf, eBuf := <-odd, <-even
fmt.Println(oBuf)
fmt.Println(eBuf)
}
// recv just receives and keeps on bufferring.
// When finished receiving, it sends the buffer
// to res.
func recv(res chan<- []int, ch <-chan int) {
var buffer = make([]int, 0)
for c := range ch {
buffer = append(buffer, c)
}
res <- buffer
}
func main() {
// Declare and init a packet
var p packet
{
p.closed = make(chan struct{})
p.send = make(chan int, 1)
p.recvEven = make(chan int, 1)
p.recvOdd = make(chan int, 1)
}
defer close(p.send)
var wg sync.WaitGroup
defer wg.Wait()
// Setup a sender which sends values to the mediator.
// Internally it'll spawn two goroutines. One starts from 0,
// and the other from 11. Could be extended as well.
p.sender(&wg, 0, 11)
// Setup a mediator that mediates the data from sender
// to printer. Only sends 20 number of data.
go p.mediator(20)
// Wait for the printer to print and then return.
p.printer()
}