- 大概流程就是job -> JobQueue
- 调度器循环获取JobQueue ,获取到的job ,再去异步获取等待可用的 worker,取出 chan Job,将job 写入改worker的 chan Job
- worker 处理任务,先处理 case job := <-w.JobChannel: 处理完成后再将 chan Job 写入到worker 里面,等待调度去取调用
package mainimport ("log""os""strconv""sync""time"
)var (MaxWorker intMaxQueue intJobQueue chan Job
)func init() {var err errorMaxWorker, err = strconv.Atoi(os.Getenv("MAX_WORKERS"))if err != nil {MaxWorker = 5 }MaxQueue, err = strconv.Atoi(os.Getenv("MAX_QUEUE"))if err != nil {MaxQueue = 10 }JobQueue = make(chan Job, MaxQueue)
}type Payload struct {
}func (p *Payload) UploadToS3() error {log.Println("Uploading to S3")return nil
}type Job struct {Payload Payload
}type Worker struct {WorkerPool chan chan JobJobChannel chan Jobquit chan bool
}func NewWorker(workerPool chan chan Job) Worker {return Worker{WorkerPool: workerPool,JobChannel: make(chan Job),quit: make(chan bool)}
}func (w Worker) Start() {go func() {for {w.WorkerPool <- w.JobChannelselect {case job := <-w.JobChannel:if err := job.Payload.UploadToS3(); err != nil {log.Printf("Error uploading to S3: %s", err)}case <-w.quit:return}}}()
}func (w *Worker) Stop() {go func() {w.quit <- true }()
}type Dispatcher struct {WorkerPool chan chan JobmaxWorkers intworkers []Worker quit chan bool
}func NewDispatcher(maxWorkers int) *Dispatcher {return &Dispatcher{WorkerPool: make(chan chan Job, maxWorkers),maxWorkers: maxWorkers,workers: make([]Worker, 0, maxWorkers),}
}func (d *Dispatcher) Runs() {for i := 0; i < d.maxWorkers; i++ {worker := NewWorker(d.WorkerPool)d.workers = append(d.workers, worker) worker.Start()}go d.dispatch()
}func (d *Dispatcher) dispatch() {for {select {case job := <-JobQueue:go func(job Job) {jobChannel := <-d.WorkerPooljobChannel <- job}(job)case <-d.quit:return}}
}func (d *Dispatcher) StopAllWorkers() {var wg sync.WaitGroupfor _, worker := range d.workers {wg.Add(1)go func(w Worker) {w.Stop() wg.Done()}(worker)}wg.Wait()
}func (d *Dispatcher) Stop() {d.quit <- trued.StopAllWorkers()
}func main() {dispatcher := NewDispatcher(MaxWorker)dispatcher.Runs()for i := 0; i < 20; i++ {payload := Payload{ }job := Job{Payload: payload}JobQueue <- job}time.Sleep(10 * time.Second)
}