2
0
Files
gitcaddy-runner/internal/app/poll/poller.go

242 lines
6.1 KiB
Go

// Copyright 2023 The Gitea Authors and MarketAlly. All rights reserved.
// SPDX-License-Identifier: MIT
// Package poll provides task polling functionality for CI runners.
package poll
import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
runnerv1 "code.gitea.io/actions-proto-go/runner/v1"
"connectrpc.com/connect"
log "github.com/sirupsen/logrus"
"golang.org/x/time/rate"
"git.marketally.com/gitcaddy/gitcaddy-runner/internal/app/run"
"git.marketally.com/gitcaddy/gitcaddy-runner/internal/pkg/cleanup"
"git.marketally.com/gitcaddy/gitcaddy-runner/internal/pkg/client"
"git.marketally.com/gitcaddy/gitcaddy-runner/internal/pkg/config"
"git.marketally.com/gitcaddy/gitcaddy-runner/internal/pkg/envcheck"
)
// Poller handles task polling from the Gitea server.
type Poller struct {
client client.Client
runner *run.Runner
cfg *config.Config
tasksVersion atomic.Int64 // tasksVersion used to store the version of the last task fetched from the Gitea.
bandwidthManager *envcheck.BandwidthManager
pollingCtx context.Context
shutdownPolling context.CancelFunc
jobsCtx context.Context
shutdownJobs context.CancelFunc
done chan struct{}
}
// New creates a new Poller instance with the given context for shutdown propagation.
func New(ctx context.Context, cfg *config.Config, client client.Client, runner *run.Runner) *Poller {
// Inherit from parent context so shutdown signals propagate properly
pollingCtx, shutdownPolling := context.WithCancel(ctx)
jobsCtx, shutdownJobs := context.WithCancel(ctx)
done := make(chan struct{})
return &Poller{
client: client,
runner: runner,
cfg: cfg,
pollingCtx: pollingCtx,
shutdownPolling: shutdownPolling,
jobsCtx: jobsCtx,
shutdownJobs: shutdownJobs,
done: done,
}
}
// SetBandwidthManager sets the bandwidth manager for on-demand testing
func (p *Poller) SetBandwidthManager(bm *envcheck.BandwidthManager) {
p.bandwidthManager = bm
}
// Poll starts polling for tasks with the configured capacity.
func (p *Poller) Poll() {
limiter := rate.NewLimiter(rate.Every(p.cfg.Runner.FetchInterval), 1)
wg := &sync.WaitGroup{}
for i := 0; i < p.cfg.Runner.Capacity; i++ {
wg.Add(1)
go p.poll(wg, limiter)
}
wg.Wait()
// signal that we shutdown
close(p.done)
}
// PollOnce polls for a single task and then exits.
func (p *Poller) PollOnce() {
limiter := rate.NewLimiter(rate.Every(p.cfg.Runner.FetchInterval), 1)
p.pollOnce(limiter)
// signal that we're done
close(p.done)
}
// Shutdown gracefully stops the poller.
func (p *Poller) Shutdown(ctx context.Context) error {
p.shutdownPolling()
select {
// graceful shutdown completed successfully
case <-p.done:
return nil
// our timeout for shutting down ran out
case <-ctx.Done():
// when both the timeout fires and the graceful shutdown
// completed successfully, this branch of the select may
// fire. Do a non-blocking check here against the graceful
// shutdown status to avoid sending an error if we don't need to.
_, ok := <-p.done
if !ok {
return nil
}
// force a shutdown of all running jobs
p.shutdownJobs()
// wait for running jobs to report their status to Gitea
<-p.done
return ctx.Err()
}
}
func (p *Poller) poll(wg *sync.WaitGroup, limiter *rate.Limiter) {
defer wg.Done()
for {
p.pollOnce(limiter)
select {
case <-p.pollingCtx.Done():
return
default:
continue
}
}
}
func (p *Poller) pollOnce(limiter *rate.Limiter) {
for {
if err := limiter.Wait(p.pollingCtx); err != nil {
if p.pollingCtx.Err() != nil {
log.WithError(err).Debug("limiter wait failed")
}
return
}
task, ok := p.fetchTask(p.pollingCtx)
if !ok {
continue
}
p.runTaskWithRecover(p.jobsCtx, task)
return
}
}
func (p *Poller) runTaskWithRecover(ctx context.Context, task *runnerv1.Task) {
defer func() {
if r := recover(); r != nil {
err := fmt.Errorf("panic: %v", r)
log.WithError(err).Error("panic in runTaskWithRecover")
}
}()
if err := p.runner.Run(ctx, task); err != nil {
log.WithError(err).Error("failed to run task")
}
}
func (p *Poller) fetchTask(ctx context.Context) (*runnerv1.Task, bool) {
reqCtx, cancel := context.WithTimeout(ctx, p.cfg.Runner.FetchTimeout)
defer cancel()
// Detect capabilities including current disk space
caps := envcheck.DetectCapabilities(ctx, p.cfg.Container.DockerHost, p.cfg.Container.WorkdirParent, p.cfg.Runner.Capacity)
// Include latest bandwidth result if available
if p.bandwidthManager != nil {
caps.Bandwidth = p.bandwidthManager.GetLastResult()
}
capsJSON := caps.ToJSON()
// Load the version value that was in the cache when the request was sent.
v := p.tasksVersion.Load()
fetchReq := &runnerv1.FetchTaskRequest{
TasksVersion: v,
CapabilitiesJson: capsJSON,
}
resp, err := p.client.FetchTask(reqCtx, connect.NewRequest(fetchReq))
if errors.Is(err, context.DeadlineExceeded) {
err = nil
}
if err != nil {
log.WithError(err).Error("failed to fetch task")
return nil, false
}
if resp == nil || resp.Msg == nil {
return nil, false
}
// Check if server requested a bandwidth test
if resp.Msg.RequestBandwidthTest && p.bandwidthManager != nil {
log.Info("Server requested bandwidth test, running now...")
go func() {
result := p.bandwidthManager.RunTest(ctx)
if result != nil {
log.Infof("Bandwidth test completed: %.1f Mbps download, %.0f ms latency",
result.DownloadMbps, result.Latency)
}
}()
}
// Check if server requested a cleanup
if resp.Msg.RequestCleanup {
log.Info("Server requested cleanup, running now...")
go func() {
result, err := cleanup.RunCleanup(ctx, p.cfg)
if err != nil {
log.Errorf("Cleanup failed: %v", err)
} else if result != nil {
log.Infof("Cleanup completed: freed %d bytes, deleted %d files in %s",
result.BytesFreed, result.FilesDeleted, result.Duration)
}
}()
}
if resp.Msg.TasksVersion > v {
p.tasksVersion.CompareAndSwap(v, resp.Msg.TasksVersion)
}
if resp.Msg.Task == nil {
return nil, false
}
// got a task, set tasksVersion to zero to force query db in next request.
p.tasksVersion.CompareAndSwap(resp.Msg.TasksVersion, 0)
return resp.Msg.Task, true
}