Go Concurrency Patterns

Do not communicate by sharing memory; instead, share memory by communicating. 并发编程的问题点总是和共享变量的方式有着重要而微妙的关系,Go+Pipeline 模式可以很好的处理并发设计。 本文参考下面文章,总结 Go 中的一些 goroutine 和 channel 的使用模式。

Pipeline and Filter 模式关键概念

将输入数据经过一系列处理,输出最终结果,就像是污水经过一系列管道最后流出自来水。其中处理过程的基础组件是过滤器(Filter 也叫 stage),可以通过各种 stage 的顺序组合变换出不同的结果。通常用于可以并行执行的计算过程。

  • stage/filter 每一节”水管”就是一个 stage,是基本组件单元,对输入进行一个运算处理
  • pipeline 某种 stage 的组合形成一个流水线,实现特定功能
  • inbound 入口,上游的数据进入 stage 时从入口取出
  • outbound 出口,流向下游的数据放入出口
  • source/producer 最上游的数据来源
  • sink/consumer 最下游的数据消费者

在 Go 中,由于 goroutine 和 channel 运行效率高、资源占用小,也可以方便的利用 Pipeline 进行程序设计。

基本示例

由三个 stage 组成的 pipeline,实现”取平方”功能

Fan-out/Fan-in

利用 goroutine 和 channel 可以实现数据的扇入、扇出。

// fan-in merge
func merge(cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)

    output := func(c <-chan int) {
        for n := range c {
            out <- n
        }
        wg.Done()
    }
    wg.Add(len(cs))
    for _, c := range cs {
        go output(c)
    }

    go func() {
        wg.Wait()
        close(out)
    }()
    return out
}

Explicit cancellation

有些情况下不需要读取所有的 inbound,可以提前获得结果并结束处理,比如中间发生了错误、已经计算出了正确值等,这种情况下可以增加一个 channel 作为结束信号。

func sq(done <-chan struct{}, in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            select {
            case out <- n * n:
            case <-done:
                return
            }
        }
    }()
    return out
}

Bounded parallelism

在一个三个 stage 组成的 pipeline 中,可以通过控制中间处理 stage 的实例数,来限制内存资源占用边界。

	// Start a fixed number of goroutines to read and digest files.
	c := make(chan result)
	var wg sync.WaitGroup
	const numDigesters = 20
	wg.Add(numDigesters)
	for i := 0; i < numDigesters; i++ {
		go func() {
			digester(done, paths, c)
			wg.Done()
		}()
	}
	go func() {
		wg.Wait()
		close(c)
	}()

A leaky buffer

一个简单的漏桶式缓冲区。

var freeList = make(chan *Buffer, 100)
var serverChan = make(chan *Buffer)

func client() {
    for {
        var b *Buffer
        // Grab a buffer if available; allocate if not.
        select {
        case b = <-freeList:
            // Got one; nothing more to do.
        default:
            // None free, so allocate a new one.
            b = new(Buffer)
        }
        load(b)              // Read next message from the net.
        serverChan <- b      // Send to server.
    }
}

func server() {
    for {
        b := <-serverChan    // Wait for work.
        process(b)
        // Reuse buffer if there's room.
        select {
        case freeList <- b:
            // Buffer on free list; nothing more to do.
        default:
            // Free list full, just carry on.
        }
    }
}s