Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More resilient droplet upload #195

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ccclient/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func (p *poller) Poll(fallbackURL *url.URL, res *http.Response, cancelChan <-cha

switch body.Entity.Status {
case JOB_QUEUED, JOB_RUNNING:
p.logger.Info("cc-job-queued-or-running", lager.Data{"status": body.Entity.Status})
case JOB_FINISHED:
p.logger.Info("cc-job-finished")
return nil
Expand Down
102 changes: 97 additions & 5 deletions cmd/cc-uploader/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,12 @@ import (
"log"
"net"
"net/http"
"net/url"
"os"
"os/signal"
"runtime"
"sync"
"syscall"
"time"

"code.cloudfoundry.org/cc-uploader/ccclient"
Expand Down Expand Up @@ -41,6 +45,9 @@ const (
communicationTimeout = 30 * time.Second
)

// Global WaitGroup to track uploads
var uploadWaitGroup sync.WaitGroup

func main() {
runtime.GOMAXPROCS(runtime.NumCPU())
flag.Parse()
Expand All @@ -54,6 +61,19 @@ func main() {

initializeDropsonde(logger, uploaderConfig)

// Create signal channel to listen for shutdown signals
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, os.Interrupt, syscall.SIGTERM)

// Goroutine to log any signal received (without handling non-TERM signals)
go func() {
allSignals := make(chan os.Signal, 1)
signal.Notify(allSignals) // Capture all signals for logging
for sig := range allSignals {
logger.Info("received-signal", lager.Data{"signal": sig.String()})
}
}()

members := grouper.Members{
{"cc-uploader-tls", initializeServer(logger, uploaderConfig, true)},
}
Expand All @@ -73,10 +93,32 @@ func main() {
monitor := ifrit.Invoke(sigmon.New(group))
logger.Info("ready")

err = <-monitor.Wait()
if err != nil {
logger.Error("exited-with-failure", err)
os.Exit(1)
select {
case err := <-monitor.Wait(): // Handle process failure
if err != nil {
logger.Info("exited-with-failure")
os.Exit(1)
}

case sig := <-signalChan: // Handle shutdown signal
logger.Info("shutdown-signal-received", lager.Data{"signal": sig})

// Gracefully signal Ifrit monitor to stop processes
monitor.Signal(os.Interrupt)
logger.Info("graceful-shutdown-waiting-for-uploads")

// Wait for all uploads to finish before shutting down
uploadWaitGroup.Wait()
logger.Info("all-uploads-completed, waiting before shutdown")
time.Sleep(20 * time.Second)
logger.Info("proceeding with shutdown")
logger.Info("all-uploads-completed, shutting down")

logger.Info("graceful-shutdown-completed")
pid := os.Getpid()

logger.Info("Forcefully terminate if necessary")
forceShutdown(pid, "cc-uploader", logger)
}

logger.Info("exited")
Expand Down Expand Up @@ -128,8 +170,10 @@ func initializeServer(logger lager.Logger, uploaderConfig config.UploaderConfig,

// To maintain backwards compatibility with hairpin polling URLs, skip SSL verification for now
poller := ccclient.NewPoller(logger, &http.Client{Transport: initializeTlsTransport(uploaderConfig, true)}, time.Duration(uploaderConfig.CCJobPollingInterval))
// Wrap the poller to track job completion
trackedPoller := trackPollingCompletion(poller, logger)

ccUploaderHandler, err := handlers.New(uploader, poller, logger)
ccUploaderHandler, err := handlers.New(uploader, trackedPoller, logger)
if err != nil {
logger.Error("router-building-failed", err)
os.Exit(1)
Expand Down Expand Up @@ -158,3 +202,51 @@ func initializeServer(logger lager.Logger, uploaderConfig config.UploaderConfig,
}
return http_server.New(uploaderConfig.ListenAddress, ccUploaderHandler)
}

// Wrap poller to track when jobs start and finish
func trackPollingCompletion(poller ccclient.Poller, logger lager.Logger) ccclient.Poller {
return &trackedPoller{
Poller: poller,
logger: logger.Session("tracked-poller"),
}
}

type trackedPoller struct {
ccclient.Poller
logger lager.Logger
}

func (tp *trackedPoller) Poll(uploadUrl *url.URL, uploadResponse *http.Response, cancelChan <-chan struct{}) error {
uploadWaitGroup.Add(1) // Increase count when polling starts
defer uploadWaitGroup.Done() // Ensures it always decrements, even on failure

err := tp.Poller.Poll(uploadUrl, uploadResponse, cancelChan)
if err != nil {
tp.logger.Error("polling-failed", err)
return err
}

tp.logger.Info("polling-succeeded")
return nil
}

func forceShutdown(pid int, processName string, logger lager.Logger) {
// Check if process is already terminated
process, err := os.FindProcess(pid)
if err != nil {
logger.Info(fmt.Sprintf("Process '%s' with pid '%d' already terminated.", processName, pid))
return
}

logger.Info(fmt.Sprintf("Forcefully shutting down process '%s' with pid '%d'", processName, pid))

// Send SIGKILL to forcefully terminate the process
time.Sleep(5 * time.Second)
err = process.Kill()
if err != nil {
logger.Error("failed-to-forcefully-shutdown-process", err, lager.Data{
"process": processName,
"pid": pid,
})
}
}