diff --git a/lib/adaptivequemgr.go b/lib/adaptivequemgr.go index 4fdd2860..8e49e2e0 100644 --- a/lib/adaptivequemgr.go +++ b/lib/adaptivequemgr.go @@ -21,8 +21,8 @@ import ( "errors" "fmt" "math/rand" - "strings" "sync/atomic" + "strings" "time" "github.com/paypal/hera/cal" @@ -118,7 +118,7 @@ type BindCount struct { Workers map[string]*WorkerClient // lookup by ticket } -func bindEvictNameOk(bindName string) bool { +func bindEvictNameOk(bindName string) (bool) { commaNames := GetConfig().BindEvictionNames if len(commaNames) == 0 { // for tests, allow all names to be subject to bind eviction @@ -126,7 +126,7 @@ func bindEvictNameOk(bindName string) bool { } commaNames = strings.ToLower(commaNames) bindName = strings.ToLower(bindName) - for _, okSubname := range strings.Split(commaNames, ",") { + for _, okSubname := range strings.Split(commaNames,",") { if strings.Contains(bindName, okSubname) { return true } @@ -134,15 +134,12 @@ func bindEvictNameOk(bindName string) bool { return false } -/* - A bad query with multiple binds will add independent bind throttles to all - -bind name and values -*/ -func (mgr *adaptiveQueueManager) doBindEviction() int { +/* A bad query with multiple binds will add independent bind throttles to all +bind name and values */ +func (mgr *adaptiveQueueManager) doBindEviction() (int) { throttleCount := 0 GetBindEvict().lock.Lock() - for _, keyValues := range GetBindEvict().BindThrottle { + for _,keyValues := range GetBindEvict().BindThrottle { throttleCount += len(keyValues) } GetBindEvict().lock.Unlock() @@ -175,14 +172,14 @@ func (mgr *adaptiveQueueManager) doBindEviction() int { } continue } - contextBinds := parseBinds(request) - sqlsrcPrefix := worker.clientHostPrefix.Load().(string) - sqlsrcApp := worker.clientApp.Load().(string) + contextBinds := parseBinds(request) + sqlsrcPrefix := worker.clientHostPrefix.Load().(string) + sqlsrcApp := worker.clientApp.Load().(string) if sqlsrcPrefix != "" { contextBinds[SrcPrefixAppKey] = fmt.Sprintf("%s%s", sqlsrcPrefix, sqlsrcApp) if logger.GetLogger().V(logger.Debug) { - msg := fmt.Sprintf("Req info: Add AZ+App to contextBinds: %s", contextBinds[SrcPrefixAppKey]) - logger.GetLogger().Log(logger.Debug, msg) + msg := fmt.Sprintf("Req info: Add AZ+App to contextBinds: %s", contextBinds[SrcPrefixAppKey]) + logger.GetLogger().Log(logger.Debug, msg) } } for bindName0, bindValue := range contextBinds { @@ -203,8 +200,8 @@ func (mgr *adaptiveQueueManager) doBindEviction() int { } concatKey := fmt.Sprintf("%d|%s|%s", sqlhash, bindName, bindValue) if logger.GetLogger().V(logger.Debug) { - msg := fmt.Sprintf("Req info: lookup concatKey = %s in bindCounts", concatKey) - logger.GetLogger().Log(logger.Debug, msg) + msg := fmt.Sprintf("Req info: lookup concatKey = %s in bindCounts", concatKey) + logger.GetLogger().Log(logger.Debug, msg) } entry, ok := bindCounts[concatKey] if !ok { @@ -213,7 +210,7 @@ func (mgr *adaptiveQueueManager) doBindEviction() int { Name: bindName, Value: bindValue, Workers: make(map[string]*WorkerClient), - } + } bindCounts[concatKey] = entry } @@ -230,7 +227,7 @@ func (mgr *adaptiveQueueManager) doBindEviction() int { bindName := entry.Name bindValue := entry.Value - if len(entry.Workers) < int(float64(GetConfig().BindEvictionThresholdPct)/100.*float64(numDispatchedWorkers)) { + if len(entry.Workers) < int( float64(GetConfig().BindEvictionThresholdPct)/100.*float64(numDispatchedWorkers) ) { continue } // evict sqlhash, bindvalue @@ -244,7 +241,7 @@ func (mgr *adaptiveQueueManager) doBindEviction() int { if mgr.dispatchedWorkers[worker] != ticket || worker.Status == wsFnsh || - atomic.LoadInt32(&worker.isUnderRecovery) == 1 /* Recover() uses compare & swap */ { + worker.isUnderRecovery == 1 /* Recover() uses compare & swap */ { continue } @@ -277,10 +274,10 @@ func (mgr *adaptiveQueueManager) doBindEviction() int { throttle.incrAllowEveryX() } else { throttle := BindThrottle{ - Name: bindName, - Value: bindValue, - Sqlhash: sqlhash, - AllowEveryX: 3*len(entry.Workers) + 1, + Name: bindName, + Value: bindValue, + Sqlhash: sqlhash, + AllowEveryX: 3*len(entry.Workers) + 1, } now := time.Now() throttle.RecentAttempt.Store(&now) @@ -467,7 +464,7 @@ func (mgr *adaptiveQueueManager) getWorkerToRecover() (*WorkerClient, bool) { } } } else { - if worker != nil && worker.Status == wsFnsh { + if worker != nil && worker.Status == wsFnsh { if logger.GetLogger().V(logger.Warning) { logger.GetLogger().Log(logger.Warning, "worker.pid state is in FNSH, so skipping", worker.pid) } diff --git a/lib/util.go b/lib/util.go index 1ba6930a..1852810d 100644 --- a/lib/util.go +++ b/lib/util.go @@ -79,8 +79,8 @@ func IsPidRunning(pid int) (isRunning bool) { } /* -1st return value: the number -2nd return value: the number of digits + 1st return value: the number + 2nd return value: the number of digits */ func atoi(bf []byte) (int, int) { sz := len(bf) @@ -96,8 +96,8 @@ func atoi(bf []byte) (int, int) { } /* -1st return value: the number -2nd return value: the number of digits + 1st return value: the number + 2nd return value: the number of digits */ func atoui(str string) (uint64, int) { sz := len(str) @@ -164,13 +164,3 @@ func ExtractSQLHash(request *netstring.Netstring) (uint32, bool) { } return 0, false } - -// Contains This is utility method to check whether value present in list or not -func Contains[T comparable](slice []T, value T) bool { - for _, val := range slice { - if val == value { - return true - } - } - return false -} diff --git a/lib/workerbroker.go b/lib/workerbroker.go index 05265754..0f9df171 100644 --- a/lib/workerbroker.go +++ b/lib/workerbroker.go @@ -19,11 +19,13 @@ package lib import ( "errors" - "github.com/paypal/hera/utility/logger" "os" "os/signal" "sync" "syscall" + + "github.com/paypal/hera/utility" + "github.com/paypal/hera/utility/logger" ) // HeraWorkerType defines the possible worker type @@ -62,7 +64,7 @@ type WorkerBroker struct { // and restart the stopped workers. // pidworkermap map[int32]*WorkerClient - lock sync.Mutex + lock sync.Mutex // // loaded from cfg once and used later. @@ -202,9 +204,7 @@ func (broker *WorkerBroker) GetWorkerPoolCfgs() (pCfgs []map[HeraWorkerType]*Wor // GetWorkerPool get the worker pool object for the type and id // ids holds optional paramenters. -// -// ids[0] == instance id; ids[1] == shard id. -// +// ids[0] == instance id; ids[1] == shard id. // if a particular id is not set, it defaults to 0. // TODO: interchange sid <--> instId since instId is not yet used func (broker *WorkerBroker) GetWorkerPool(wType HeraWorkerType, ids ...int) (workerbroker *WorkerPool, err error) { @@ -273,69 +273,59 @@ func (broker *WorkerBroker) startWorkerMonitor() (err error) { // we can get all the pids in this call. double the size in case we // get none-hera defunct processes. +1 in case racing casue mapsize=0. // - defunctPids := make([]int32, 0) - for { - var status syscall.WaitStatus - - //Reap exited children in non-blocking mode - pid, err := syscall.Wait4(-1, &status, syscall.WNOHANG, nil) - if pid > 0 { - if logger.GetLogger().V(logger.Verbose) { - logger.GetLogger().Log(logger.Verbose, "received worker exit signal for pid:", pid, " status: ", status) - } - defunctPids = append(defunctPids, int32(pid)) - } else if pid == 0 { - break - } else { - if errors.Is(err, syscall.ECHILD) { - break - } else { - logger.GetLogger().Log(logger.Warning, "error in wait signal: ", err) - } - } + var arraySize = 2*len(broker.pidworkermap) + 1 + var defunctPids = make([]int32, arraySize) + if logger.GetLogger().V(logger.Verbose) { + logger.GetLogger().Log(logger.Verbose, "Wait SIGCHLD len=", arraySize-1, ", pwmap:", broker.pidworkermap) } - - if len(defunctPids) > 0 { - if logger.GetLogger().V(logger.Debug) { - logger.GetLogger().Log(logger.Debug, "worker exit signal received from pids :", defunctPids) + if arraySize > 0 { + utility.ReapDefunctPids(defunctPids) + } + if logger.GetLogger().V(logger.Info) { + logger.GetLogger().Log(logger.Info, "exited worker", defunctPids) + } + broker.lock.Lock() + for i := 0; i < arraySize; i++ { + // + // last valid entry in stoppedpids is followed by one or more zeros. + // + if defunctPids[i] == 0 { + break } - broker.lock.Lock() - for _, pid := range defunctPids { - var workerclient = broker.pidworkermap[pid] - if workerclient != nil { - delete(broker.pidworkermap, pid) - pool, err := GetWorkerBrokerInstance().GetWorkerPool(workerclient.Type, workerclient.instID, workerclient.shardID) - if err != nil { - if logger.GetLogger().V(logger.Alert) { - logger.GetLogger().Log(logger.Alert, "Can't get pool for", workerclient, ":", err) - } - } else { - // - // a worker could be terminated while serving a request. - // in these cases, doRead() in workerclient will get an - // EOF and exit. doSession() in coordinator will get the - // worker outCh closed event and exit, at which point - // coordinator itself calls returnworker to set connstate - // from assign to idle. - // no need to publish the following event again. - // - //if (workerclient.Status == WAIT) || (workerclient.Status == BUSY) { - // GetStateLog().PublishStateEvent(StateEvent{eType:ConnStateEvt, shardId:workerclient.shardId, wType:workerclient.Type, instId:workerclient.instId, oldCState:Assign, newCState:Idle}) - //} - if logger.GetLogger().V(logger.Debug) { - logger.GetLogger().Log(logger.Debug, "worker (id=", workerclient.ID, "pid=", workerclient.pid, ") received signal. transits from state ", workerclient.Status, " to terminated.") - } - workerclient.setState(wsUnset) // Set the state to UNSET to make sure worker does not stay in FNSH state so long - pool.RestartWorker(workerclient) + var workerclient = broker.pidworkermap[defunctPids[i]] + if workerclient != nil { + delete(broker.pidworkermap, defunctPids[i]) + pool, err := GetWorkerBrokerInstance().GetWorkerPool(workerclient.Type, workerclient.instID, workerclient.shardID) + if err != nil { + if logger.GetLogger().V(logger.Alert) { + logger.GetLogger().Log(logger.Alert, "Can't get pool for", workerclient, ":", err) } } else { - if logger.GetLogger().V(logger.Alert) { - logger.GetLogger().Log(logger.Alert, "Exited worker pid =", pid, " not found") + // + // a worker could be terminated while serving a request. + // in these cases, doRead() in workerclient will get an + // EOF and exit. doSession() in coordinator will get the + // worker outCh closed event and exit, at which point + // coordinator itself calls returnworker to set connstate + // from assign to idle. + // no need to publish the following event again. + // + //if (workerclient.Status == WAIT) || (workerclient.Status == BUSY) { + // GetStateLog().PublishStateEvent(StateEvent{eType:ConnStateEvt, shardId:workerclient.shardId, wType:workerclient.Type, instId:workerclient.instId, oldCState:Assign, newCState:Idle}) + //} + if logger.GetLogger().V(logger.Debug) { + logger.GetLogger().Log(logger.Debug, "worker (pid=", workerclient.pid, ") received signal. transits from state ", workerclient.Status, " to terminated.") } + workerclient.setState(wsUnset) // Set the state to UNSET to make sure worker does not stay in FNSH state so long + pool.RestartWorker(workerclient) + } + } else { + if logger.GetLogger().V(logger.Alert) { + logger.GetLogger().Log(logger.Alert, "Exited worker pid =", defunctPids[i], " not found") } } - broker.lock.Unlock() } + broker.lock.Unlock() case syscall.SIGTERM: if logger.GetLogger().V(logger.Debug) { logger.GetLogger().Log(logger.Debug, "Got SIGTERM") @@ -375,8 +365,8 @@ func (broker *WorkerBroker) startWorkerMonitor() (err error) { } /* -resizePool calls workerpool.Resize to resize a worker pool when the dynamic configuration of -the number of workers changed + resizePool calls workerpool.Resize to resize a worker pool when the dynamic configuration of + the number of workers changed */ func (broker *WorkerBroker) resizePool(wType HeraWorkerType, maxWorkers int, shardID int) { broker.poolCfgs[0][wType].maxWorkerCnt = maxWorkers @@ -391,7 +381,7 @@ func (broker *WorkerBroker) resizePool(wType HeraWorkerType, maxWorkers int, sha } /* -changeMaxWorkers is called when the dynamic config changed, it calls resizePool() for all the pools + changeMaxWorkers is called when the dynamic config changed, it calls resizePool() for all the pools */ func (broker *WorkerBroker) changeMaxWorkers() { wW := GetNumWWorkers(0) diff --git a/lib/workerclient.go b/lib/workerclient.go index 5edbb77f..1a86d619 100644 --- a/lib/workerclient.go +++ b/lib/workerclient.go @@ -21,21 +21,20 @@ import ( "bytes" "errors" "fmt" - "github.com/paypal/hera/cal" - "github.com/paypal/hera/common" - "github.com/paypal/hera/utility/encoding/netstring" - "github.com/paypal/hera/utility/logger" "math/rand" "net" "os" "path/filepath" - "runtime" "strconv" "strings" - "sync" "sync/atomic" "syscall" "time" + + "github.com/paypal/hera/cal" + "github.com/paypal/hera/common" + "github.com/paypal/hera/utility/encoding/netstring" + "github.com/paypal/hera/utility/logger" ) // HeraWorkerStatus defines the posible states the worker can be in @@ -148,15 +147,12 @@ type WorkerClient struct { rqId uint32 // - // under recovery. 0: no; 1: yes. use atomic.CompareAndSwapInt32 to check state and use atomic.LoadInt32 to read state + // under recovery. 0: no; 1: yes. use atomic.CompareAndSwapInt32 to check state. // isUnderRecovery int32 // Throtle workers lifecycle thr Throttler - - //mutex lock to update state from single go-routine - stateLock sync.Mutex } type strandedCalInfo struct { @@ -204,7 +200,7 @@ func NewWorker(wid int, wType HeraWorkerType, instID int, shardID int, moduleNam } // TODO worker.racID = -1 - atomic.CompareAndSwapInt32(&worker.isUnderRecovery, 1, 0) + worker.isUnderRecovery = 0 if worker.ctrlCh != nil { close(worker.ctrlCh) } @@ -214,7 +210,6 @@ func NewWorker(wid int, wType HeraWorkerType, instID int, shardID int, moduleNam // msg. if adaptiveqmgr blocks on a non-buffered channel, there is a deadlock when return worker // worker.ctrlCh = make(chan *workerMsg, 5) - return worker } @@ -638,12 +633,15 @@ type WorkerClientRecoverParam struct { func (worker *WorkerClient) Recover(p *WorkerPool, ticket string, recovParam WorkerClientRecoverParam, info *strandedCalInfo, param ...int) { if atomic.CompareAndSwapInt32(&worker.isUnderRecovery, 0, 1) { if logger.GetLogger().V(logger.Debug) { - logger.GetLogger().Log(logger.Debug, "begin recover worker Id: ", worker.ID, " process Id: ", worker.pid) + logger.GetLogger().Log(logger.Debug, "begin recover worker: ", worker.pid) } } else { if logger.GetLogger().V(logger.Debug) { - logger.GetLogger().Log(logger.Debug, "worker already underrecovery: ", worker.ID, " process Id: ", worker.pid) + logger.GetLogger().Log(logger.Debug, "worker already underrecovery: ", worker.pid) } + // + // defer will not be called. + // return } defer func() { @@ -667,9 +665,6 @@ func (worker *WorkerClient) Recover(p *WorkerPool, ticket string, recovParam Wor return } priorWorkerStatus := worker.Status - if logger.GetLogger().V(logger.Debug) { - logger.GetLogger().Log(logger.Debug, fmt.Sprintf("about to recover worker Id: %d, worker process Id: %d as part of reconvery process, setting worker state to Quece", worker.ID, worker.pid)) - } worker.setState(wsQuce) killparam := common.StrandedClientClose if len(param) > 0 { @@ -682,12 +677,8 @@ func (worker *WorkerClient) Recover(p *WorkerPool, ticket string, recovParam Wor case <-workerRecoverTimeout: worker.thr.CanRun() worker.setState(wsInit) // Set the worker state to INIT when we decide to Terminate the worker - GetStateLog().PublishStateEvent(StateEvent{eType: WorkerStateEvt, shardID: worker.shardID, wType: worker.Type, instID: worker.instID, workerID: worker.ID, newWState: worker.Status}) worker.Terminate() worker.callogStranded("RECYCLED", info) - if logger.GetLogger().V(logger.Debug) { - logger.GetLogger().Log(logger.Debug, fmt.Sprintf("worker Id: %d and process: %d recovered as part of workerRecoverTimeout set status to INIT", worker.ID, worker.pid)) - } return case msg, ok := <-worker.channel(): if !ok { @@ -723,10 +714,8 @@ func (worker *WorkerClient) Recover(p *WorkerPool, ticket string, recovParam Wor logger.GetLogger().Log(logger.Info, "stranded conn recovered", worker.Type, worker.pid) } worker.callogStranded("RECOVERED", info) + worker.setState(wsFnsh) - if logger.GetLogger().V(logger.Debug) { - logger.GetLogger().Log(logger.Debug, fmt.Sprintf("worker Id: %d, worker process: %d recovered as part of message from channel set status to FINSH", worker.ID, worker.pid)) - } p.ReturnWorker(worker, ticket) // // donot set state to ACPT since worker could already be picked up by another @@ -915,7 +904,7 @@ func (worker *WorkerClient) doRead() { worker.setState(wsWait) } if eor != common.EORMoreIncomingRequests { - worker.outCh <- &workerMsg{data: payload, eor: true, free: (eor == common.EORFree), inTransaction: ((eor == common.EORInTransaction) || (eor == common.EORInCursorInTransaction)), rqId: uint32(rqId)} + worker.outCh <- &workerMsg{data: payload, eor: true, free: (eor == common.EORFree), inTransaction: ((eor == common.EORInTransaction) || (eor == common.EORInCursorInTransaction)), rqId: rqId} payload = nil } else { // buffer data to avoid race condition @@ -952,13 +941,8 @@ func (worker *WorkerClient) doRead() { // Write sends a message to the worker func (worker *WorkerClient) Write(ns *netstring.Netstring, nsCount uint16) error { - if atomic.LoadInt32(&worker.isUnderRecovery) == 1 { - if logger.GetLogger().V(logger.Alert) { - logger.GetLogger().Log(logger.Alert, "workerclient write error: worker is under recovery.") - } - return ErrWorkerFail - } worker.setState(wsBusy) + worker.rqId += uint32(nsCount) // @@ -982,19 +966,16 @@ func (worker *WorkerClient) Write(ns *netstring.Netstring, nsCount uint16) error // setState updates the worker state func (worker *WorkerClient) setState(status HeraWorkerStatus) { - currentStatus := worker.Status - if currentStatus == status { + if worker.Status == status { return } - if atomic.LoadInt32(&worker.isUnderRecovery) == 1 && (status == wsWait || status == wsBusy) { - logger.GetLogger().Log(logger.Warning, "worker : ", worker.ID, "processId: ", worker.pid, " seeing invalid state transition from ", currentStatus, " to ", status) - if logger.GetLogger().V(logger.Debug) { - worker.printCallStack() - } - return + if logger.GetLogger().V(logger.Debug) { + logger.GetLogger().Log(logger.Debug, "worker pid=", worker.pid, " changing status from", worker.Status, "to", status) } - //This checks whether state transition is valid or not + + // TODO: sync atomic set worker.Status = status + GetStateLog().PublishStateEvent(StateEvent{eType: WorkerStateEvt, shardID: worker.shardID, wType: worker.Type, instID: worker.instID, workerID: worker.ID, newWState: status}) } @@ -1021,27 +1002,3 @@ func (worker *WorkerClient) isProcessRunning() bool { } return true } - -func (worker *WorkerClient) printCallStack() { - // Define a large enough buffer to capture the stack. - const depth = 64 - pcs := make([]uintptr, depth) - - // Collect the stack trace. - n := runtime.Callers(2, pcs) // Skip the first 2 callers (runtime and printCallStack itself). - frames := runtime.CallersFrames(pcs[:n]) - indent := 0 - // Iterate through the frames and print function names and line numbers. - var builder strings.Builder - builder.WriteString(fmt.Sprintf("worker Id= %d Process Id= %d Call Stack:", worker.ID, worker.pid)) - for { - frame, more := frames.Next() - builder.WriteString(fmt.Sprintf("%s - %s\n", strings.Repeat(" ", indent), frame.Function)) - builder.WriteString(fmt.Sprintf("%s at %s:%d\n", strings.Repeat(" ", indent), frame.File, frame.Line)) - indent++ - if !more { - break - } - } - logger.GetLogger().Log(logger.Debug, builder.String()) -} diff --git a/lib/workerpool.go b/lib/workerpool.go index 37d96855..50aab16f 100644 --- a/lib/workerpool.go +++ b/lib/workerpool.go @@ -193,6 +193,7 @@ func (pool *WorkerPool) RestartWorker(worker *WorkerClient) (err error) { } pool.activeQ.Remove(worker) pool.poolCond.L.Unlock() + go pool.spawnWorker(worker.ID) return nil } diff --git a/lib/workerpool_test.go b/lib/workerpool_test.go index 14b8cf03..4e3db96d 100644 --- a/lib/workerpool_test.go +++ b/lib/workerpool_test.go @@ -76,13 +76,6 @@ func TestPoolDempotency(t *testing.T) { wd := NewWorker(3, wtypeRW, 0, 0, "cloc", nil) we := NewWorker(4, wtypeRW, 0, 0, "cloc", nil) wf := NewWorker(5, wtypeRW, 0, 0, "cloc", nil) - wa.setState(wsInit) - wb.setState(wsInit) - wc.setState(wsInit) - wd.setState(wsInit) - we.setState(wsInit) - wf.setState(wsInit) - wa.setState(wsAcpt) wb.setState(wsAcpt) wc.setState(wsAcpt) diff --git a/tests/functionaltest/strandedchild_tests/dmldisconnect/main_test.go b/tests/functionaltest/strandedchild_tests/dmldisconnect/main_test.go index f2430977..e261b35a 100644 --- a/tests/functionaltest/strandedchild_tests/dmldisconnect/main_test.go +++ b/tests/functionaltest/strandedchild_tests/dmldisconnect/main_test.go @@ -1,14 +1,13 @@ -package main - +package main import ( "context" "database/sql" "fmt" - "github.com/paypal/hera/tests/functionaltest/testutil" - "github.com/paypal/hera/utility/logger" "os" "testing" "time" + "github.com/paypal/hera/tests/functionaltest/testutil" + "github.com/paypal/hera/utility/logger" ) /* @@ -18,6 +17,7 @@ No setup needed */ + var mx testutil.Mux var tableName string @@ -29,8 +29,8 @@ func cfg() (map[string]string, map[string]string, testutil.WorkerType) { appcfg["log_level"] = "5" appcfg["log_file"] = "hera.log" appcfg["rac_sql_interval"] = "0" - appcfg["opscfg.default.server.idle_timeout_ms"] = "3000" - appcfg["opscfg.default.server.transaction_idle_timeout_ms"] = "5000" + appcfg["opscfg.default.server.idle_timeout_ms"] = "3000" + appcfg["opscfg.default.server.transaction_idle_timeout_ms"] = "5000" appcfg["child.executable"] = "mysqlworker" appcfg["database_type"] = "mysql" @@ -43,16 +43,18 @@ func cfg() (map[string]string, map[string]string, testutil.WorkerType) { func setupDb() error { testutil.RunDML("DROP TABLE IF EXISTS test_simple_table_2") - return testutil.RunDML("CREATE TABLE test_simple_table_2 (accountID VARCHAR(64) PRIMARY KEY, NAME VARCHAR(64), STATUS VARCHAR(64), CONDN VARCHAR(64))") + return testutil.RunDML("CREATE TABLE test_simple_table_2 (accountID VARCHAR(64) PRIMARY KEY, NAME VARCHAR(64), STATUS VARCHAR(64), CONDN VARCHAR(64))") } + func TestMain(m *testing.M) { os.Exit(testutil.UtilMain(m, cfg, setupDb)) } + /* ########################################################################################## - # Perform an insert without commit - # While the query is in transaction, close connection + # Perform an insert without commit + # While the query is in transaction, close connection # Verify worker get stranded and recovered ########################################################################################## */ @@ -61,53 +63,54 @@ func TestDmlDisconnect(t *testing.T) { logger.GetLogger().Log(logger.Debug, "TestDmlDisconnect begin +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++\n") hostname := testutil.GetHostname() - fmt.Println("Hostname: ", hostname) - db, err := sql.Open("hera", hostname+":31002") - if err != nil { - t.Fatal("Error starting Mux:", err) - return - } + fmt.Println ("Hostname: ", hostname); + db, err := sql.Open("hera", hostname + ":31002") + if err != nil { + t.Fatal("Error starting Mux:", err) + return + } db.SetMaxIdleConns(0) defer db.Close() - fmt.Println("Open new connection") - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - conn, err := db.Conn(ctx) - if err != nil { - t.Fatalf("Error getting connection %s\n", err.Error()) - } + fmt.Println ("Open new connection"); + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + conn, err := db.Conn(ctx) + if err != nil { + t.Fatalf("Error getting connection %s\n", err.Error()) + } - fmt.Println("Perform an insert without commit") - stmt, _ := conn.PrepareContext(ctx, "/*TestBasic*/ insert into test_simple_table_2 (accountID, Name, Status) VALUES (12345, 'Linda Smith' , '111')") + fmt.Println ("Perform an insert without commit"); + stmt, _ := conn.PrepareContext(ctx, "/*TestBasic*/ insert into test_simple_table_2 (accountID, Name, Status) VALUES (12345, 'Linda Smith' , '111')") _, err = stmt.Exec() if err != nil { - t.Fatalf("Error preparing test (create row in table) %s\n", err.Error()) + t.Fatalf("Error preparing test (create row in table) %s\n", err.Error()) + } + stmt.Close() + cancel() + fmt.Println ("Close connection while insert query is in transaction"); + conn.Close() + + time.Sleep(1 * time.Second); + fmt.Println ("Verify worker get stranded and recovered"); + if ( testutil.RegexCount("begin recover worker:") < 1) { + t.Fatalf ("Error: should have worker recovered"); + } + + if ( testutil.RegexCount("stranded conn recovered") < 1) { + t.Fatalf ("Error: should have stranded conn recovered"); + } + + fmt.Println ("Verify worker recovery is seen in CALlog") + count := testutil.RegexCountFile ("RECOVER.*dedicated.*0", "cal.log") + if (count < 1) { + t.Fatalf ("Error: should see worker recovery event"); } - stmt.Close() - cancel() - fmt.Println("Close connection while insert query is in transaction") - conn.Close() - - time.Sleep(1 * time.Second) - fmt.Println("Verify worker get stranded and recovered") - if testutil.RegexCount("begin recover worker") < 1 { - t.Fatalf("Error: should have worker recovered") - } - - if testutil.RegexCount("stranded conn recovered") < 1 { - t.Fatalf("Error: should have stranded conn recovered") - } - - fmt.Println("Verify worker recovery is seen in CALlog") - count := testutil.RegexCountFile("RECOVER.*dedicated.*0", "cal.log") - if count < 1 { - t.Fatalf("Error: should see worker recovery event") - } - count = testutil.RegexCountFile("STRANDED.*RECOVERED.*0", "cal.log") - if count < 1 { - t.Fatalf("Error: should see worker recovery event") + count = testutil.RegexCountFile ("STRANDED.*RECOVERED.*0", "cal.log") + if (count < 1) { + t.Fatalf ("Error: should see worker recovery event"); } logger.GetLogger().Log(logger.Debug, "TestDmlDisconnect done -------------------------------------------------------------") } + diff --git a/tests/unittest/sqlEvict/main_test.go b/tests/unittest/sqlEvict/main_test.go index 5a704796..5cf84296 100644 --- a/tests/unittest/sqlEvict/main_test.go +++ b/tests/unittest/sqlEvict/main_test.go @@ -4,7 +4,6 @@ import ( "context" "database/sql" "fmt" - "math/rand" "os" "testing" "time" @@ -34,11 +33,11 @@ func cfg() (map[string]string, map[string]string, testutil.WorkerType) { appcfg["bind_eviction_threshold_pct"] = "50" appcfg["request_backlog_timeout"] = "1000" - appcfg["soft_eviction_probability"] = "10" + appcfg["soft_eviction_probability"] = "100" opscfg := make(map[string]string) - max_conn = 50 - opscfg["opscfg.default.server.max_connections"] = fmt.Sprintf("%d", 10) + max_conn = 25 + opscfg["opscfg.default.server.max_connections"] = fmt.Sprintf("%d", int(max_conn)) opscfg["opscfg.default.server.log_level"] = "5" opscfg["opscfg.default.server.saturation_recover_threshold"] = "10" @@ -100,56 +99,6 @@ func sleepyQ(conn *sql.Conn, delayRow int) error { return nil } -func sleepyDmlQ(conn *sql.Conn, delayRow int) error { - inserQuery := "insert into sleep_info (id,seconds) values (:id, sleep_option(:seconds))" - updateQuery := "update sleep_info set seconds = sleep_option(:seconds) where id=:id" - defer func(conn *sql.Conn) { - err := conn.Close() - if err != nil { - fmt.Printf("Error closing conn %s\n", err.Error()) - } - }(conn) - tx, _ := conn.BeginTx(context.Background(), nil) - inst1, err := conn.PrepareContext(context.Background(), inserQuery) - if err != nil { - fmt.Printf("Error preparing sleepyDmlQ %s\n", err.Error()) - return err - } - defer func(inst1 *sql.Stmt) { - err := inst1.Close() - if err != nil { - fmt.Printf("Error closing insert statement sleepyDmlQ %s\n", err.Error()) - } - }(inst1) - _, err = inst1.ExecContext(context.Background(), sql.Named("id", rand.Int()), sql.Named("seconds", delayRow)) - if err != nil { - fmt.Printf("Error query sleepyDmlQ %s\n", err.Error()) - return err - } - updateStmt, err := conn.PrepareContext(context.Background(), updateQuery) - if err != nil { - fmt.Printf("Error preparing sleepyDmlQ %s\n", err.Error()) - return err - } - defer func(updateStmt *sql.Stmt) { - err := updateStmt.Close() - if err != nil { - fmt.Printf("Error closing update statement sleepyDmlQ %s\n", err.Error()) - } - }(updateStmt) - _, err = updateStmt.ExecContext(context.Background(), sql.Named("id", rand.Int()), sql.Named("seconds", delayRow)) - if err != nil { - fmt.Printf("Error query sleepyDmlQ %s\n", err.Error()) - return err - } - err = tx.Commit() - if err != nil { - fmt.Printf("Error committing sleepyDmlQ %s\n", err.Error()) - return err - } - return nil -} - func simpleEvict() { db, err := sql.Open("hera", "127.0.0.1:31002") if err != nil { @@ -208,7 +157,10 @@ func TestSqlEvict(t *testing.T) { simpleEvict() if testutil.RegexCountFile("HERA-100: backlog timeout", "hera.log") == 0 { t.Fatal("backlog timeout was not triggered") - } + } // */ + /* if (testutil.RegexCountFile("coordinator dispatchrequest: no worker HERA-102: backlog eviction", "hera.log") == 0) { + t.Fatal("backlog eviction was not triggered") + } // */ if testutil.RegexCountFile("coordinator dispatchrequest: no worker HERA-104: saturation soft sql eviction", "hera.log") == 0 { t.Fatal("soft eviction was not triggered") } @@ -216,81 +168,5 @@ func TestSqlEvict(t *testing.T) { t.Fatal("eviction was not triggered") } logger.GetLogger().Log(logger.Debug, "TestSqlEvict stop +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++\n") - time.Sleep(10 * time.Second) + time.Sleep(2 * time.Second) } // */ - -func TestSqlEvictDML(t *testing.T) { - logger.GetLogger().Log(logger.Debug, "TestSqlEvictDML begin +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++\n") - dmlEvict() - if testutil.RegexCountFile("HERA-100: backlog timeout", "hera.log") == 0 { - t.Fatal("backlog timeout was not triggered") - } - if testutil.RegexCountFile("coordinator dispatchrequest: no worker HERA-104: saturation soft sql eviction", "hera.log") == 0 { - t.Fatal("soft eviction was not triggered") - } - if testutil.RegexCountFile("coordinator dispatchrequest: stranded conn HERA-101: saturation kill", "hera.log") == 0 { - t.Fatal("eviction was not triggered") - } - logger.GetLogger().Log(logger.Debug, "TestSqlEvictDML stop +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++\n") - time.Sleep(10 * time.Second) -} - -func dmlEvict() { - db, err := sql.Open("hera", "127.0.0.1:31002") - if err != nil { - fmt.Printf("Error db %s\n", err.Error()) - return - } - db.SetConnMaxLifetime(2 * time.Second) - db.SetMaxIdleConns(0) - db.SetMaxOpenConns(22111) - defer func(db *sql.DB) { - err := db.Close() - if err != nil { - fmt.Printf("Error closing db %s\n", err.Error()) - } - }(db) - - conn, err := db.Conn(context.Background()) - if err != nil { - fmt.Printf("Error conn %s\n", err.Error()) - return - } - err = sleepyDmlQ(conn, 1600) - if err != nil { - fmt.Printf("Error Executing first sleepyDmlQ %s\n", err.Error()) - return - } - - for i := 0; i < int(max_conn)+1; i++ { - conn, err := db.Conn(context.Background()) - if err != nil { - fmt.Printf("Error #%d conn %s\n", i, err.Error()) - continue - } - time.Sleep(time.Millisecond * 100) - fmt.Printf("connection count %d\n", i) - go func(index int) { - err := sleepyDmlQ(conn, 1600) - if err != nil { - fmt.Printf("Long query Request Id: %d Error executing the sleepyDmlQ %s\n", index, err.Error()) - } - }(i) - } - - for i := 0; i < 50; i++ { - conn, err := db.Conn(context.Background()) - if err != nil { - fmt.Printf("Error #%d conn %s\n", i, err.Error()) - continue - } - time.Sleep(time.Millisecond * 100) - fmt.Printf("connection count %d\n", i) - go func(index int) { - err := sleepyDmlQ(conn, 1600) - if err != nil { - fmt.Printf("Request id: %d Error executing the sleepyDmlQ %s\n", index, err.Error()) - } - }(i) - } -}