Go中的并发模式:Worker Pool模式详解与实现
字数 912 2025-11-17 03:02:16
Go中的并发模式:Worker Pool模式详解与实现
1. Worker Pool模式简介
Worker Pool(工作池) 是一种并发设计模式,用于限制并发执行的Goroutine数量,避免资源耗尽。其核心思想是预先创建一组固定的Worker Goroutine,通过任务队列接收任务,实现任务的并发处理与负载均衡。
适用场景:
- 高并发任务处理(如HTTP请求、文件处理)
- 控制资源消耗(避免无限创建Goroutine)
- 需要任务队列缓冲或优先级调度
2. Worker Pool的核心组件
- 任务队列(Task Queue):
- 通道(Channel)用于传递任务,可以是带缓冲的通道以实现队列功能。
- 工作者(Worker):
- 一组长期运行的Goroutine,从任务队列中获取任务并执行。
- 任务提交器(Task Dispatcher):
- 向任务队列提交任务的外部接口。
3. 实现步骤详解
步骤1:定义任务类型
任务通常是一个函数或闭包,通过通道传递。例如:
type Task func() // 任务类型为无参数函数
步骤2:创建任务队列
使用带缓冲的通道作为任务队列,缓冲大小决定队列容量:
taskQueue := make(chan Task, 100) // 缓冲100个任务
步骤3:启动Worker Goroutine
每个Worker循环从任务队列中读取任务并执行:
func startWorker(id int, taskQueue <-chan Task) {
for task := range taskQueue { // 循环读取任务,队列关闭后退出
fmt.Printf("Worker %d processing task\n", id)
task() // 执行任务
}
}
步骤4:初始化Worker Pool
根据预设的Worker数量启动多个Worker:
func createWorkerPool(numWorkers int, taskQueue <-chan Task) {
for i := 1; i <= numWorkers; i++ {
go startWorker(i, taskQueue)
}
}
步骤5:提交任务
通过通道向任务队列发送任务:
func submitTask(taskQueue chan<- Task, task Task) {
taskQueue <- task
}
步骤6:关闭队列与资源清理
任务提交完成后,关闭任务队列以通知Worker退出:
close(taskQueue) // 关闭通道,Worker自动退出循环
4. 完整代码示例
package main
import (
"fmt"
"sync"
"time"
)
type Task func()
func main() {
const numWorkers = 3
const taskQueueSize = 10
taskQueue := make(chan Task, taskQueueSize)
var wg sync.WaitGroup
// 启动Worker Pool
for i := 1; i <= numWorkers; i++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
for task := range taskQueue { // 队列关闭后自动退出
fmt.Printf("Worker %d started task\n", workerID)
task()
fmt.Printf("Worker %d completed task\n", workerID)
}
}(i)
}
// 提交任务
for i := 1; i <= 5; i++ {
taskID := i
taskQueue <- func() {
time.Sleep(1 * time.Second) // 模拟任务执行
fmt.Printf("Task %d executed\n", taskID)
}
}
close(taskQueue) // 关闭队列,触发Worker退出
wg.Wait() // 等待所有Worker退出
fmt.Println("All tasks completed")
}
执行结果:
Worker 1 started task
Worker 2 started task
Worker 3 started task
Task 1 executed
Worker 1 completed task
Worker 1 started task
Task 2 executed
Worker 2 completed task
...
5. 高级优化与变种
- 动态调整Worker数量:
- 通过信号量(如带缓冲通道)控制Worker数量,实现弹性扩缩容。
- 任务优先级队列:
- 使用多个通道或自定义堆结构实现优先级调度。
- 错误处理与重试:
- 在Task中增加错误返回,通过额外通道收集错误并重试。
- 优雅关闭:
- 使用
context.Context通知Worker退出,避免任务丢失。
- 使用
6. 总结
- 优势:资源可控、避免Goroutine泄露、任务调度灵活。
- 注意点:任务队列容量需合理设置,避免阻塞或内存溢出。
- 适用性:适合任务独立、无严格执行顺序的场景。