当使用通道来串行化任务时,我们因此创建了一个异步流。在异步编程中很容易搞砸,尤其是有额外的要求时,比如超时和取消。 真实世界的例子,是从文件读取器逐行读取数据,然后传递给另一个异步工作的文本解析器。
我找到了一个通用模式,可以用易于理解的代码覆盖大多数这样的场景。
异步生产者后跟同步消费者。
形如:
1go provider()
2consumer()
例如:
1var (
2 provider = make(chan int)
3 consumer = make(chan int)
4 ctx, cancel = context.WithTimeout(context.Background, time.Second)
5)
6defer cancel()
7
8// 生产者
9go func() {
10 defer close(consumer)
11
12 for {
13 select {
14 case <-ctx.Done():
15 return
16 case v, ok := <-provider:
17 if !ok {
18 return
19 }
20 consumer <- v
21 }
22 }
23}()
24
25consume:
26for {
27 select {
28 case <-ctx.Done():
29 return
30 case v, ok := <-consumer:
31 if !ok {
32 break consume
33 }
34 }
35}
36
37// 后续逻辑
这里,我们有一个生产者
和一个消费者
。生产者
是一个 goroutine,它从生产者
通道中读取数据并将其传递给消费者
通道。消费者
是一个阻塞的 for 循环,它从消费者
通道中读取数据。
1var err error
2
3consume:
4for {
5 select {
6 case <-ctx.Done():
7 return
8 case v, ok := <-consumer:
9 if !ok {
10 break consume
11 }
12
13 if err = process(v); err != nil {
14 break consume
15 }
16 }
17}
18
19// 后续逻辑
20if err != nil {
21 log.Error(err)
22}
祝我们在异步编程中好运!