package river import ( "context" "encoding/json" "errors" "log/slog" "sync/atomic" "time" "github.com/riverqueue/river/internal/baseservice" "github.com/riverqueue/river/internal/componentstatus" "github.com/riverqueue/river/internal/dbadapter" "github.com/riverqueue/river/internal/jobcompleter" "github.com/riverqueue/river/internal/notifier" "github.com/riverqueue/river/internal/util/chanutil" "github.com/riverqueue/river/internal/util/sliceutil" ) type producerConfig struct { ErrorHandler ErrorHandler // FetchCooldown is the minimum amount of time to wait between fetches of new // jobs. Jobs will only be fetched *at most* this often, but if no new jobs // are coming in via LISTEN/NOTIFY then feches may be delayed as long as // FetchPollInterval. FetchCooldown time.Duration // FetchPollInterval is the amount of time between periodic fetches for new // jobs. Typically new jobs will be picked up ~immediately after insert via // LISTEN/NOTIFY, but this provides a fallback. FetchPollInterval time.Duration JobTimeout time.Duration MaxWorkerCount uint16 Notifier *notifier.Notifier QueueName string RetryPolicy ClientRetryPolicy WorkerName string Workers *Workers } // producer manages a fleet of Workers up to a maximum size. It periodically fetches jobs // from the adapter and dispatches them to Workers. It receives completed job results from Workers. // // The producer never fetches more jobs than the number of free Worker slots it // has available. This is not optimal for throughput compared to pre-fetching // extra jobs, but it is better for smaller job counts or slower jobs where even // distribution and minimizing execution latency is more important. type producer struct { baseservice.BaseService // Jobs which are currently being worked. Only used by main goroutine. activeJobs map[int64]*jobExecutor adapter dbadapter.Adapter completer jobcompleter.JobCompleter config *producerConfig errorHandler ErrorHandler workers *Workers // Receives completed jobs from workers. Written by completed workers, only // read from main goroutine. jobResultCh chan *JobRow jobTimeout time.Duration // An atomic count of the number of jobs actively being worked on. This is // written to by the main goroutine, but read by the dispatcher. numJobsActive atomic.Int32 numJobsRan atomic.Uint64 retryPolicy ClientRetryPolicy } func newProducer(archetype *baseservice.Archetype, adapter dbadapter.Adapter, completer jobcompleter.JobCompleter, config *producerConfig) (*producer, error) { if adapter == nil { return nil, errors.New("Adapter is required") //nolint:stylecheck } if completer == nil { return nil, errors.New("Completer is required") //nolint:stylecheck } if config.FetchCooldown <= 0 { return nil, errors.New("FetchCooldown must be great than zero") } if config.FetchPollInterval <= 0 { return nil, errors.New("FetchPollInterval must be greater than zero") } if config.JobTimeout < -1 { return nil, errors.New("JobTimeout must be greater or equal to zero") } if config.MaxWorkerCount == 0 { return nil, errors.New("MaxWorkerCount is required") } if config.Notifier == nil { return nil, errors.New("Notifier is required") //nolint:stylecheck } if config.QueueName == "" { return nil, errors.New("QueueName is required") } if config.RetryPolicy == nil { return nil, errors.New("RetryPolicy is required") } if config.WorkerName == "" { return nil, errors.New("WorkerName is required") } if config.Workers == nil { return nil, errors.New("Workers is required") } return baseservice.Init(archetype, &producer{ activeJobs: make(map[int64]*jobExecutor), adapter: adapter, completer: completer, config: config, errorHandler: config.ErrorHandler, jobResultCh: make(chan *JobRow, config.MaxWorkerCount), jobTimeout: config.JobTimeout, retryPolicy: config.RetryPolicy, workers: config.Workers, }), nil } type producerStatusUpdateFunc func(queue string, status componentstatus.Status) // Run starts the producer. It blocks until the producer has completed // graceful shutdown. // // When fetchCtx is cancelled, no more jobs will be fetched; however, if a fetch // is already in progress, It will be allowed to complete and run any fetched // jobs. When workCtx is cancelled, any in-progress jobs will have their // contexts cancelled too. func (p *producer) Run(fetchCtx, workCtx context.Context, statusFunc producerStatusUpdateFunc) { p.Logger.InfoContext(workCtx, p.Name+": Producer started", slog.String("queue", p.config.QueueName)) defer func() { p.Logger.InfoContext(workCtx, p.Name+": Producer stopped", slog.String("queue", p.config.QueueName), slog.Uint64("num_completed_jobs", p.numJobsRan.Load())) }() go p.heartbeatLogLoop(fetchCtx) statusFunc(p.config.QueueName, componentstatus.Initializing) // TODO: fetcher should have some jitter in it to avoid stampeding issues. fetchLimiter := chanutil.NewDebouncedChan(fetchCtx, p.config.FetchCooldown) p.fetchAndRunLoop(fetchCtx, workCtx, fetchLimiter, statusFunc) statusFunc(p.config.QueueName, componentstatus.ShuttingDown) p.executorShutdownLoop() statusFunc(p.config.QueueName, componentstatus.Stopped) } type insertPayload struct { Queue string `json:"queue"` } func (p *producer) fetchAndRunLoop(fetchCtx, workCtx context.Context, fetchLimiter *chanutil.DebouncedChan, statusFunc producerStatusUpdateFunc) { p.Logger.InfoContext(workCtx, p.Name+": Run loop started") defer p.Logger.InfoContext(workCtx, p.Name+": Run loop stopped") // Prime the fetchLimiter so we can make an initial fetch without waiting for // an insert notification or a fetch poll. fetchLimiter.Call() handleInsertNotification := func(topic notifier.NotificationTopic, payload string) { var decoded insertPayload if err := json.Unmarshal([]byte(payload), &decoded); err != nil { p.Logger.ErrorContext(workCtx, p.Name+": Failed to unmarshal insert notification payload", slog.String("err", err.Error())) return } if decoded.Queue != p.config.QueueName { return } p.Logger.DebugContext(workCtx, p.Name+": Received insert notification", slog.String("queue", decoded.Queue)) fetchLimiter.Call() } sub := p.config.Notifier.Listen(notifier.NotificationTopicInsert, handleInsertNotification) defer sub.Unlisten() fetchPollTimer := time.NewTimer(p.config.FetchPollInterval) go func() { for { select { case <-fetchCtx.Done(): // Stop fetch timer so no more fetches are triggered. if !fetchPollTimer.Stop() { <-fetchPollTimer.C } return case <-fetchPollTimer.C: fetchLimiter.Call() fetchPollTimer.Reset(p.config.FetchPollInterval) } } }() statusFunc(p.config.QueueName, componentstatus.Healthy) fetchResultCh := make(chan producerFetchResult) for { select { case <-fetchCtx.Done(): return case <-fetchLimiter.C(): p.innerFetchLoop(workCtx, fetchResultCh) // Ensure we can't start another fetch when fetchCtx is done, even if // the fetchLimiter is also ready to fire: select { case <-fetchCtx.Done(): return default: } case result := <-p.jobResultCh: p.removeActiveJob(result.ID) } } } func (p *producer) innerFetchLoop(workCtx context.Context, fetchResultCh chan producerFetchResult) { count := p.maxJobsToFetch() go p.dispatchWork(count, fetchResultCh) //nolint:contextcheck for { select { case result := <-fetchResultCh: if result.err != nil { p.Logger.ErrorContext(workCtx, p.Name+": Error fetching jobs", slog.String("err", result.err.Error())) } else if len(result.jobs) > 0 { p.startNewExecutors(workCtx, result.jobs) } return case result := <-p.jobResultCh: p.removeActiveJob(result.ID) } } } func (p *producer) executorShutdownLoop() { // No more jobs will be fetched or executed. However, we must wait for all // in-progress jobs to complete. for { if len(p.activeJobs) == 0 { break } result := <-p.jobResultCh p.removeActiveJob(result.ID) } } func (p *producer) addActiveJob(id int64, executor *jobExecutor) { p.numJobsActive.Add(1) p.activeJobs[id] = executor } func (p *producer) removeActiveJob(id int64) { delete(p.activeJobs, id) p.numJobsActive.Add(-1) p.numJobsRan.Add(1) } func (p *producer) dispatchWork(count int32, jobsFetchedCh chan<- producerFetchResult) { // This intentionally uses a background context because we don't want it to // get cancelled if the producer is asked to shut down. In that situation, we // want to finish fetching any jobs we are in the midst of fetching, work // them, and then stop. Otherwise we'd have a risk of shutting down when we // had already fetched jobs in the database, leaving those jobs stranded. We'd // then potentially have to release them back to the queue. internalJobs, err := p.adapter.JobGetAvailable(context.Background(), p.config.QueueName, count) if err != nil { jobsFetchedCh <- producerFetchResult{err: err} return } jobs := sliceutil.Map(internalJobs, jobRowFromInternal) jobsFetchedCh <- producerFetchResult{jobs: jobs} } // Periodically logs an informational log line giving some insight into the // current state of the producer. func (p *producer) heartbeatLogLoop(ctx context.Context) { ticker := time.NewTicker(5 * time.Second) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: p.Logger.InfoContext(ctx, p.Name+": Heartbeat", slog.Uint64("num_completed_jobs", p.numJobsRan.Load()), slog.Int("num_jobs_running", int(p.numJobsActive.Load())), slog.String("queue", p.config.QueueName), ) } } } func (p *producer) startNewExecutors(workCtx context.Context, jobs []*JobRow) { for _, job := range jobs { workInfo, ok := p.workers.workersMap[job.Kind] var workUnit workUnit if ok { workUnit = workInfo.workUnitFactory.MakeUnit(job) } executor := baseservice.Init(&p.Archetype, &jobExecutor{ Adapter: p.adapter, ClientJobTimeout: p.jobTimeout, ClientRetryPolicy: p.retryPolicy, Completer: p.completer, ErrorHandler: p.errorHandler, InformProducerDoneFunc: p.handleWorkerDone, JobRow: job, WorkUnit: workUnit, }) p.addActiveJob(job.ID, executor) go executor.Execute(workCtx) // TODO: // Errors can be recorded synchronously before the Executor slot is considered // available. // // Successful jobs can be sent to the completer for async acking, IF they // aren't already completed by the user. Do we need an internal field + // convenience method to make that part work? } } func (p *producer) maxJobsToFetch() int32 { return int32(p.config.MaxWorkerCount) - p.numJobsActive.Load() } func (p *producer) handleWorkerDone(job *JobRow) { p.jobResultCh <- job } type producerFetchResult struct { jobs []*JobRow err error }