74 lines
1.7 KiB
Go
74 lines
1.7 KiB
Go
|
package media
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"errors"
|
||
|
"time"
|
||
|
|
||
|
"go.uber.org/zap"
|
||
|
)
|
||
|
|
||
|
// WorkerPool is a pool of workers that can consume and run a queue of tasks.
|
||
|
type WorkerPool struct {
|
||
|
size int
|
||
|
ch chan func()
|
||
|
logger *zap.SugaredLogger
|
||
|
}
|
||
|
|
||
|
// NewWorkerPool returns a new WorkerPool containing the specified number of
|
||
|
// workers, and with the provided maximum queue size. Jobs added to the queue
|
||
|
// after it reaches this size limit will be rejected.
|
||
|
func NewWorkerPool(size int, maxQueueSize int, logger *zap.SugaredLogger) *WorkerPool {
|
||
|
return &WorkerPool{
|
||
|
size: size,
|
||
|
ch: make(chan func(), maxQueueSize),
|
||
|
logger: logger,
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// NewTestWorkerPool returns a new running WorkerPool with a single worker,
|
||
|
// and noop logger, suitable for test environments.
|
||
|
func NewTestWorkerPool() *WorkerPool {
|
||
|
p := NewWorkerPool(1, 256, zap.NewNop().Sugar())
|
||
|
p.Run()
|
||
|
return p
|
||
|
}
|
||
|
|
||
|
// Run launches the workers, and returns immediately.
|
||
|
func (p *WorkerPool) Run() {
|
||
|
for i := 0; i < p.size; i++ {
|
||
|
go func() {
|
||
|
for task := range p.ch {
|
||
|
task()
|
||
|
}
|
||
|
}()
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// WaitForTask blocks while the provided task is executed by a worker,
|
||
|
// returning the error returned by the task.
|
||
|
func (p *WorkerPool) WaitForTask(ctx context.Context, taskFunc func() error) error {
|
||
|
done := make(chan error)
|
||
|
queuedAt := time.Now()
|
||
|
fn := func() {
|
||
|
startedAt := time.Now()
|
||
|
result := taskFunc()
|
||
|
|
||
|
durTotal := time.Since(queuedAt)
|
||
|
durTask := time.Since(startedAt)
|
||
|
durQueue := startedAt.Sub(queuedAt)
|
||
|
p.logger.With("task", durTask, "queue", durQueue, "total", durTotal).Infof("Completed task")
|
||
|
|
||
|
done <- result
|
||
|
}
|
||
|
|
||
|
select {
|
||
|
case p.ch <- fn:
|
||
|
return <-done
|
||
|
case <-ctx.Done():
|
||
|
return ctx.Err()
|
||
|
default:
|
||
|
return errors.New("worker queue full")
|
||
|
}
|
||
|
}
|