Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 22 additions & 25 deletions lib/adaptivequemgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import (
"errors"
"fmt"
"math/rand"
"strings"
"sync/atomic"
"strings"
"time"

"github.com/paypal/hera/cal"
Expand Down Expand Up @@ -118,31 +118,28 @@ type BindCount struct {
Workers map[string]*WorkerClient // lookup by ticket
}

func bindEvictNameOk(bindName string) bool {
func bindEvictNameOk(bindName string) (bool) {
commaNames := GetConfig().BindEvictionNames
if len(commaNames) == 0 {
// for tests, allow all names to be subject to bind eviction
return true
}
commaNames = strings.ToLower(commaNames)
bindName = strings.ToLower(bindName)
for _, okSubname := range strings.Split(commaNames, ",") {
for _, okSubname := range strings.Split(commaNames,",") {
if strings.Contains(bindName, okSubname) {
return true
}
}
return false
}

/*
A bad query with multiple binds will add independent bind throttles to all

bind name and values
*/
func (mgr *adaptiveQueueManager) doBindEviction() int {
/* A bad query with multiple binds will add independent bind throttles to all
bind name and values */
func (mgr *adaptiveQueueManager) doBindEviction() (int) {
throttleCount := 0
GetBindEvict().lock.Lock()
for _, keyValues := range GetBindEvict().BindThrottle {
for _,keyValues := range GetBindEvict().BindThrottle {
throttleCount += len(keyValues)
}
GetBindEvict().lock.Unlock()
Expand Down Expand Up @@ -175,14 +172,14 @@ func (mgr *adaptiveQueueManager) doBindEviction() int {
}
continue
}
contextBinds := parseBinds(request)
sqlsrcPrefix := worker.clientHostPrefix.Load().(string)
sqlsrcApp := worker.clientApp.Load().(string)
contextBinds := parseBinds(request)
sqlsrcPrefix := worker.clientHostPrefix.Load().(string)
sqlsrcApp := worker.clientApp.Load().(string)
if sqlsrcPrefix != "" {
contextBinds[SrcPrefixAppKey] = fmt.Sprintf("%s%s", sqlsrcPrefix, sqlsrcApp)
if logger.GetLogger().V(logger.Debug) {
msg := fmt.Sprintf("Req info: Add AZ+App to contextBinds: %s", contextBinds[SrcPrefixAppKey])
logger.GetLogger().Log(logger.Debug, msg)
msg := fmt.Sprintf("Req info: Add AZ+App to contextBinds: %s", contextBinds[SrcPrefixAppKey])
logger.GetLogger().Log(logger.Debug, msg)
}
}
for bindName0, bindValue := range contextBinds {
Expand All @@ -203,8 +200,8 @@ func (mgr *adaptiveQueueManager) doBindEviction() int {
}
concatKey := fmt.Sprintf("%d|%s|%s", sqlhash, bindName, bindValue)
if logger.GetLogger().V(logger.Debug) {
msg := fmt.Sprintf("Req info: lookup concatKey = %s in bindCounts", concatKey)
logger.GetLogger().Log(logger.Debug, msg)
msg := fmt.Sprintf("Req info: lookup concatKey = %s in bindCounts", concatKey)
logger.GetLogger().Log(logger.Debug, msg)
}
entry, ok := bindCounts[concatKey]
if !ok {
Expand All @@ -213,7 +210,7 @@ func (mgr *adaptiveQueueManager) doBindEviction() int {
Name: bindName,
Value: bindValue,
Workers: make(map[string]*WorkerClient),
}
}
bindCounts[concatKey] = entry
}

Expand All @@ -230,7 +227,7 @@ func (mgr *adaptiveQueueManager) doBindEviction() int {
bindName := entry.Name
bindValue := entry.Value

if len(entry.Workers) < int(float64(GetConfig().BindEvictionThresholdPct)/100.*float64(numDispatchedWorkers)) {
if len(entry.Workers) < int( float64(GetConfig().BindEvictionThresholdPct)/100.*float64(numDispatchedWorkers) ) {
continue
}
// evict sqlhash, bindvalue
Expand All @@ -244,7 +241,7 @@ func (mgr *adaptiveQueueManager) doBindEviction() int {

if mgr.dispatchedWorkers[worker] != ticket ||
worker.Status == wsFnsh ||
atomic.LoadInt32(&worker.isUnderRecovery) == 1 /* Recover() uses compare & swap */ {
worker.isUnderRecovery == 1 /* Recover() uses compare & swap */ {

continue
}
Expand Down Expand Up @@ -277,10 +274,10 @@ func (mgr *adaptiveQueueManager) doBindEviction() int {
throttle.incrAllowEveryX()
} else {
throttle := BindThrottle{
Name: bindName,
Value: bindValue,
Sqlhash: sqlhash,
AllowEveryX: 3*len(entry.Workers) + 1,
Name: bindName,
Value: bindValue,
Sqlhash: sqlhash,
AllowEveryX: 3*len(entry.Workers) + 1,
}
now := time.Now()
throttle.RecentAttempt.Store(&now)
Expand Down Expand Up @@ -467,7 +464,7 @@ func (mgr *adaptiveQueueManager) getWorkerToRecover() (*WorkerClient, bool) {
}
}
} else {
if worker != nil && worker.Status == wsFnsh {
if worker != nil && worker.Status == wsFnsh {
if logger.GetLogger().V(logger.Warning) {
logger.GetLogger().Log(logger.Warning, "worker.pid state is in FNSH, so skipping", worker.pid)
}
Expand Down
18 changes: 4 additions & 14 deletions lib/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ func IsPidRunning(pid int) (isRunning bool) {
}

/*
1st return value: the number
2nd return value: the number of digits
1st return value: the number
2nd return value: the number of digits
*/
func atoi(bf []byte) (int, int) {
sz := len(bf)
Expand All @@ -96,8 +96,8 @@ func atoi(bf []byte) (int, int) {
}

/*
1st return value: the number
2nd return value: the number of digits
1st return value: the number
2nd return value: the number of digits
*/
func atoui(str string) (uint64, int) {
sz := len(str)
Expand Down Expand Up @@ -164,13 +164,3 @@ func ExtractSQLHash(request *netstring.Netstring) (uint32, bool) {
}
return 0, false
}

// Contains This is utility method to check whether value present in list or not
func Contains[T comparable](slice []T, value T) bool {
for _, val := range slice {
if val == value {
return true
}
}
return false
}
116 changes: 53 additions & 63 deletions lib/workerbroker.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ package lib

import (
"errors"
"github.com/paypal/hera/utility/logger"
"os"
"os/signal"
"sync"
"syscall"

"github.com/paypal/hera/utility"
"github.com/paypal/hera/utility/logger"
)

// HeraWorkerType defines the possible worker type
Expand Down Expand Up @@ -62,7 +64,7 @@ type WorkerBroker struct {
// and restart the stopped workers.
//
pidworkermap map[int32]*WorkerClient
lock sync.Mutex
lock sync.Mutex

//
// loaded from cfg once and used later.
Expand Down Expand Up @@ -202,9 +204,7 @@ func (broker *WorkerBroker) GetWorkerPoolCfgs() (pCfgs []map[HeraWorkerType]*Wor

// GetWorkerPool get the worker pool object for the type and id
// ids holds optional paramenters.
//
// ids[0] == instance id; ids[1] == shard id.
//
// ids[0] == instance id; ids[1] == shard id.
// if a particular id is not set, it defaults to 0.
// TODO: interchange sid <--> instId since instId is not yet used
func (broker *WorkerBroker) GetWorkerPool(wType HeraWorkerType, ids ...int) (workerbroker *WorkerPool, err error) {
Expand Down Expand Up @@ -273,69 +273,59 @@ func (broker *WorkerBroker) startWorkerMonitor() (err error) {
// we can get all the pids in this call. double the size in case we
// get none-hera defunct processes. +1 in case racing casue mapsize=0.
//
defunctPids := make([]int32, 0)
for {
var status syscall.WaitStatus

//Reap exited children in non-blocking mode
pid, err := syscall.Wait4(-1, &status, syscall.WNOHANG, nil)
if pid > 0 {
if logger.GetLogger().V(logger.Verbose) {
logger.GetLogger().Log(logger.Verbose, "received worker exit signal for pid:", pid, " status: ", status)
}
defunctPids = append(defunctPids, int32(pid))
} else if pid == 0 {
break
} else {
if errors.Is(err, syscall.ECHILD) {
break
} else {
logger.GetLogger().Log(logger.Warning, "error in wait signal: ", err)
}
}
var arraySize = 2*len(broker.pidworkermap) + 1
var defunctPids = make([]int32, arraySize)
if logger.GetLogger().V(logger.Verbose) {
logger.GetLogger().Log(logger.Verbose, "Wait SIGCHLD len=", arraySize-1, ", pwmap:", broker.pidworkermap)
}

if len(defunctPids) > 0 {
if logger.GetLogger().V(logger.Debug) {
logger.GetLogger().Log(logger.Debug, "worker exit signal received from pids :", defunctPids)
if arraySize > 0 {
utility.ReapDefunctPids(defunctPids)
}
if logger.GetLogger().V(logger.Info) {
logger.GetLogger().Log(logger.Info, "exited worker", defunctPids)
}
broker.lock.Lock()
for i := 0; i < arraySize; i++ {
//
// last valid entry in stoppedpids is followed by one or more zeros.
//
if defunctPids[i] == 0 {
break
}
broker.lock.Lock()
for _, pid := range defunctPids {
var workerclient = broker.pidworkermap[pid]
if workerclient != nil {
delete(broker.pidworkermap, pid)
pool, err := GetWorkerBrokerInstance().GetWorkerPool(workerclient.Type, workerclient.instID, workerclient.shardID)
if err != nil {
if logger.GetLogger().V(logger.Alert) {
logger.GetLogger().Log(logger.Alert, "Can't get pool for", workerclient, ":", err)
}
} else {
//
// a worker could be terminated while serving a request.
// in these cases, doRead() in workerclient will get an
// EOF and exit. doSession() in coordinator will get the
// worker outCh closed event and exit, at which point
// coordinator itself calls returnworker to set connstate
// from assign to idle.
// no need to publish the following event again.
//
//if (workerclient.Status == WAIT) || (workerclient.Status == BUSY) {
// GetStateLog().PublishStateEvent(StateEvent{eType:ConnStateEvt, shardId:workerclient.shardId, wType:workerclient.Type, instId:workerclient.instId, oldCState:Assign, newCState:Idle})
//}
if logger.GetLogger().V(logger.Debug) {
logger.GetLogger().Log(logger.Debug, "worker (id=", workerclient.ID, "pid=", workerclient.pid, ") received signal. transits from state ", workerclient.Status, " to terminated.")
}
workerclient.setState(wsUnset) // Set the state to UNSET to make sure worker does not stay in FNSH state so long
pool.RestartWorker(workerclient)
var workerclient = broker.pidworkermap[defunctPids[i]]
if workerclient != nil {
delete(broker.pidworkermap, defunctPids[i])
pool, err := GetWorkerBrokerInstance().GetWorkerPool(workerclient.Type, workerclient.instID, workerclient.shardID)
if err != nil {
if logger.GetLogger().V(logger.Alert) {
logger.GetLogger().Log(logger.Alert, "Can't get pool for", workerclient, ":", err)
}
} else {
if logger.GetLogger().V(logger.Alert) {
logger.GetLogger().Log(logger.Alert, "Exited worker pid =", pid, " not found")
//
// a worker could be terminated while serving a request.
// in these cases, doRead() in workerclient will get an
// EOF and exit. doSession() in coordinator will get the
// worker outCh closed event and exit, at which point
// coordinator itself calls returnworker to set connstate
// from assign to idle.
// no need to publish the following event again.
//
//if (workerclient.Status == WAIT) || (workerclient.Status == BUSY) {
// GetStateLog().PublishStateEvent(StateEvent{eType:ConnStateEvt, shardId:workerclient.shardId, wType:workerclient.Type, instId:workerclient.instId, oldCState:Assign, newCState:Idle})
//}
if logger.GetLogger().V(logger.Debug) {
logger.GetLogger().Log(logger.Debug, "worker (pid=", workerclient.pid, ") received signal. transits from state ", workerclient.Status, " to terminated.")
}
workerclient.setState(wsUnset) // Set the state to UNSET to make sure worker does not stay in FNSH state so long
pool.RestartWorker(workerclient)
}
} else {
if logger.GetLogger().V(logger.Alert) {
logger.GetLogger().Log(logger.Alert, "Exited worker pid =", defunctPids[i], " not found")
}
}
broker.lock.Unlock()
}
broker.lock.Unlock()
case syscall.SIGTERM:
if logger.GetLogger().V(logger.Debug) {
logger.GetLogger().Log(logger.Debug, "Got SIGTERM")
Expand Down Expand Up @@ -375,8 +365,8 @@ func (broker *WorkerBroker) startWorkerMonitor() (err error) {
}

/*
resizePool calls workerpool.Resize to resize a worker pool when the dynamic configuration of
the number of workers changed
resizePool calls workerpool.Resize to resize a worker pool when the dynamic configuration of
the number of workers changed
*/
func (broker *WorkerBroker) resizePool(wType HeraWorkerType, maxWorkers int, shardID int) {
broker.poolCfgs[0][wType].maxWorkerCnt = maxWorkers
Expand All @@ -391,7 +381,7 @@ func (broker *WorkerBroker) resizePool(wType HeraWorkerType, maxWorkers int, sha
}

/*
changeMaxWorkers is called when the dynamic config changed, it calls resizePool() for all the pools
changeMaxWorkers is called when the dynamic config changed, it calls resizePool() for all the pools
*/
func (broker *WorkerBroker) changeMaxWorkers() {
wW := GetNumWWorkers(0)
Expand Down
Loading
Loading