Go中的并发模式:生产者-消费者模式详解与实现
字数 742 2025-12-13 02:01:33
Go中的并发模式:生产者-消费者模式详解与实现
题目描述:
生产者-消费者模式是Go并发编程中的经典设计模式,用于协调多个Goroutine之间的数据生产和消费。这个模式涉及两种角色的Goroutine:生产者负责生成数据并放入共享缓冲区,消费者负责从缓冲区取出并处理数据。在Go中,通常使用Channel作为缓冲区来实现这个模式,但需要处理好同步、关闭和并发安全等问题。
解题过程:
1. 基本模式实现
先看最简单的生产者-消费者模型实现:
package main
import (
"fmt"
"sync"
)
func main() {
data := make(chan int, 5) // 缓冲区大小为5
var wg sync.WaitGroup
// 生产者
wg.Add(1)
go func() {
defer wg.Done()
defer close(data) // 生产者完成后关闭通道
for i := 0; i < 10; i++ {
data <- i
fmt.Printf("生产者发送: %d\n", i)
}
}()
// 消费者
wg.Add(1)
go func() {
defer wg.Done()
for num := range data { // 自动检测通道关闭
fmt.Printf("消费者接收: %d\n", num)
}
}()
wg.Wait()
}
关键点详解:
- 使用带缓冲的Channel(
make(chan int, 5))作为共享缓冲区 - 生产者完成后调用
close(data)关闭通道 - 消费者使用
for range循环自动检测通道关闭 - 使用
sync.WaitGroup同步Goroutine完成
2. 多生产者多消费者模式
实际场景通常有多个生产者和消费者:
func main() {
data := make(chan int, 10)
var wg sync.WaitGroup
// 3个生产者
for i := 0; i < 3; i++ {
wg.Add(1)
producerID := i
go func() {
defer wg.Done()
for j := 0; j < 5; j++ {
value := producerID*100 + j
data <- value
fmt.Printf("生产者%d发送: %d\n", producerID, value)
}
}()
}
// 等待所有生产者完成,然后关闭通道
go func() {
wg.Wait()
close(data)
}()
// 2个消费者
var consumerWg sync.WaitGroup
for i := 0; i < 2; i++ {
consumerWg.Add(1)
consumerID := i
go func() {
defer consumerWg.Done()
for num := range data {
fmt.Printf("消费者%d接收: %d\n", consumerID, num)
}
}()
}
consumerWg.Wait()
}
3. 优雅关闭通道机制
多生产者场景下,需要安全的关闭通道:
func main() {
data := make(chan int, 10)
stopChan := make(chan struct{}) // 停止信号
var wg sync.WaitGroup
// 多个生产者
for i := 0; i < 3; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for j := 0; ; j++ {
select {
case <-stopChan: // 收到停止信号
return
case data <- id*1000 + j:
time.Sleep(time.Millisecond * 10)
}
}
}(i)
}
// 控制逻辑:运行一段时间后停止
go func() {
time.Sleep(time.Second)
close(stopChan) // 发送停止信号
wg.Wait() // 等待所有生产者完成
close(data) // 安全关闭数据通道
}()
// 消费者
for num := range data {
fmt.Printf("接收: %d\n", num)
}
}
4. 使用context控制生命周期
更现代的关闭方式使用context:
func producer(ctx context.Context, data chan<- int, id int) {
for i := 0; ; i++ {
select {
case <-ctx.Done(): // 收到取消信号
fmt.Printf("生产者%d退出\n", id)
return
case data <- id*1000 + i:
time.Sleep(time.Millisecond * 50)
}
}
}
func consumer(ctx context.Context, data <-chan int, id int) {
for {
select {
case <-ctx.Done():
fmt.Printf("消费者%d退出\n", id)
return
case num, ok := <-data:
if !ok { // 通道已关闭
fmt.Printf("消费者%d: 通道已关闭\n", id)
return
}
fmt.Printf("消费者%d接收: %d\n", id, num)
}
}
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
data := make(chan int, 20)
// 启动多个生产者和消费者
for i := 0; i < 3; i++ {
go producer(ctx, data, i)
}
for i := 0; i < 2; i++ {
go consumer(ctx, data, i)
}
<-ctx.Done()
time.Sleep(time.Millisecond * 100) // 给goroutine退出时间
close(data) // 关闭通道
}
5. 带错误处理的完整实现
实际生产中需要处理错误:
type Task struct {
ID int
Data string
Error error
}
func main() {
tasks := make(chan Task, 100)
results := make(chan Task, 100)
errChan := make(chan error, 10)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// 生产者
go func() {
defer close(tasks)
for i := 0; i < 100; i++ {
task := Task{
ID: i,
Data: fmt.Sprintf("data-%d", i),
}
select {
case <-ctx.Done():
return
case tasks <- task:
}
}
}()
// 消费者池
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
for task := range tasks {
// 模拟处理
if task.ID%10 == 0 { // 模拟错误
task.Error = fmt.Errorf("处理失败")
errChan <- task.Error
}
select {
case <-ctx.Done():
return
case results <- task:
}
}
}(i)
}
// 等待消费者完成,然后关闭结果通道
go func() {
wg.Wait()
close(results)
close(errChan)
}()
// 结果处理器
go func() {
for result := range results {
if result.Error != nil {
fmt.Printf("任务%d失败: %v\n", result.ID, result.Error)
} else {
fmt.Printf("任务%d完成\n", result.ID)
}
}
}()
// 错误监控
go func() {
for err := range errChan {
fmt.Printf("发生错误: %v\n", err)
if someCondition(err) {
cancel() // 严重错误时取消所有操作
}
}
}()
time.Sleep(time.Second * 2)
}
6. 性能优化考虑
实际使用时需要考虑性能优化:
type Pool struct {
work chan func()
workers int
wg sync.WaitGroup
}
func NewPool(size int) *Pool {
p := &Pool{
work: make(chan func()),
workers: size,
}
p.start()
return p
}
func (p *Pool) start() {
for i := 0; i < p.workers; i++ {
p.wg.Add(1)
go func() {
defer p.wg.Done()
for task := range p.work {
task()
}
}()
}
}
func (p *Pool) Submit(task func()) {
p.work <- task
}
func (p *Pool) Stop() {
close(p.work)
p.wg.Wait()
}
最佳实践总结:
- 缓冲区大小选择:根据生产消费速度差选择合适的缓冲区大小
- 优雅关闭:使用context或单独的信号通道协调关闭
- 错误处理:建立错误通道收集处理错误
- 资源控制:使用Worker Pool控制并发数
- 监控:添加metrics监控生产消费速率
- 背压:缓冲区满时应有处理策略(阻塞、丢弃、扩容等)
这个模式的核心是通过Channel实现Goroutine间的安全通信,结合select、context等机制实现优雅的控制流程。