一种处理 Golang 中链式通道的方法

一种处理 Golang 中链式通道的方法

当使用通道来串行化任务时,我们因此创建了一个异步流。在异步编程中很容易搞砸,尤其是有额外的要求时,比如超时和取消。 真实世界的例子,是从文件读取器逐行读取数据,然后传递给另一个异步工作的文本解析器。

我找到了一个通用模式,可以用易于理解的代码覆盖大多数这样的场景。

异步生产者后跟同步消费者。

形如:

1go provider()
2consumer()

例如:

 1var (
 2  provider         = make(chan int)
 3  consumer         = make(chan int)
 4  ctx, cancel      = context.WithTimeout(context.Background, time.Second)
 5)
 6defer cancel()
 7
 8// 生产者
 9go func() {
10    defer close(consumer)
11
12    for {
13        select {
14        case <-ctx.Done():
15            return
16        case v, ok := <-provider:
17            if !ok {
18                return
19            }
20            consumer <- v
21        }
22    }
23}()
24
25consume:
26for {
27    select {
28    case <-ctx.Done():
29        return
30    case v, ok := <-consumer:
31        if !ok {
32            break consume
33        }
34    }
35}
36
37// 后续逻辑

这里,我们有一个生产者和一个消费者生产者是一个 goroutine,它从生产者通道中读取数据并将其传递给消费者通道。消费者是一个阻塞的 for 循环,它从消费者通道中读取数据。

 1var err error
 2
 3consume:
 4for {
 5    select {
 6    case <-ctx.Done():
 7        return
 8    case v, ok := <-consumer:
 9        if !ok {
10            break consume
11        }
12
13        if err = process(v); err != nil {
14            break consume
15        }
16    }
17}
18
19// 后续逻辑
20if err != nil {
21    log.Error(err)
22}

祝我们在异步编程中好运!