Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docker/gorgon_couchbase/node/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ FROM ubuntu:24.04
ENV LANG=C.UTF-8

RUN apt-get -y update && apt-get -y install \
curl golang-go iptables netcat-openbsd procps python3 sudo tar unzip wget
curl golang-go iptables netcat-openbsd procps python3 sudo tar unzip wget tshark

ARG CB_URL=""
RUN if [ -n "$CB_URL" ]; then \
Expand All @@ -19,6 +19,7 @@ RUN if [ -n "$CB_URL" ]; then \
fi

RUN mkdir /root/deps
RUN mkdir -p /root/store/cbcollects_and_captures
ADD deps.tgz /root/deps
WORKDIR /root/deps/src/gorgon_couchbase
RUN go mod download
Expand Down
12 changes: 11 additions & 1 deletion src/gorgon/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ const exitUsage = 2
// Entry point for Gorgon; delegates to either control node (run) or worker node (rpc) based on command
func Main(db gorgon.Database) int {
var filter Filter

// Create an options object with some fields set, others defaulted.
opt := &gorgon.Options{
WorkloadDuration: time.Minute,
Concurrency: 6,
Expand All @@ -25,6 +27,7 @@ func Main(db gorgon.Database) int {
if ret := parseOptions(opt, &filter); ret != 0 {
return ret
}
// Validate the options and set them in the database, if there is an error log and exit with 1
if err := db.SetOptions(opt); err != nil {
log.Error("Error in Database.SetOptions: %v", err)
return 1
Expand Down Expand Up @@ -68,6 +71,10 @@ func cmdRun(db gorgon.Database, opt *gorgon.Options, filter *Filter) int {
}
// Verify linearizability/ sequential consistency of observed operations
if err := runner.Check(history, ""); err != nil {
if err == linearizabilityTimeoutErr || err == sequentialTimeoutErr {
log.Error("Consistency check timed out: %v", err)
return 3 // error code for unstable(timeout)
}
log.Error("Error in Runner.Check: %v", err)
return 1
}
Expand Down Expand Up @@ -125,6 +132,10 @@ func parseOptions(opt *gorgon.Options, filter *Filter) int {
"Don't stop a worker when its client returns an error that is not unambiguous")
flag.IntVar(&opt.RpcPort, "gorgon-rpc-port", opt.RpcPort, "RPC port to connect")
flag.StringVar(&opt.RpcPassword, "gorgon-rpc-password", opt.RpcPassword, "RPC password")
flag.BoolVar(&opt.CbcollectLogging, "gorgon-cbcollect-log", false, "boolean to enable cbcollect logs")
flag.BoolVar(&opt.NetworkTraceCapture, "gorgon-network-capture", false, "boolean to enable network trace capture")
flag.StringVar(&opt.LogDirectory, "gorgon-log-directory", "/root/store/cbcollects_and_captures/", "Path for cbcollect log zip file")
flag.StringVar(&opt.ErrOnTestFail, "gorgon-err-on-test-fail", "", "Consistency level that triggers a test failure error: linearizability or sequential")

flag.Parse()
if flag.NArg() == 0 {
Expand Down Expand Up @@ -163,6 +174,5 @@ func parseOptions(opt *gorgon.Options, filter *Filter) int {
fmt.Println("Minimum one node")
return exitUsage
}

return 0
}
52 changes: 40 additions & 12 deletions src/gorgon/cmd/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@ import (
"github.com/couchbaselabs/gorgon/src/gorgon/log"
)

var (
linearizabilityErr = errors.New("Linearizability check failed")
linearizabilityTimeoutErr = errors.New("Linearizability check timed out")
sequentialErr = errors.New("Sequential consistency check failed")
sequentialTimeoutErr = errors.New("Sequential consistency check timed out")
)

type Runner struct {
name string
db gorgon.Database
Expand All @@ -41,10 +48,18 @@ func (runner *Runner) Name() string {
return runner.name
}

// This function prepares the Runner by setting up the database,
// creating clients, and setting up the workload generators.
func (runner *Runner) SetUp() error {
log.Info("[%s] Database SetUp", runner.name)
var setupComplete bool // To guard the defer
defer func() { // Tears down runner's db if partial setup
if !setupComplete {
err := runner.db.TearDown()
if err != nil {
log.Error("[%s] Error in database teardown: %v", runner.name, err)
}
}
}()

// Initialize the database before any client connections can be established
if err := runner.db.SetUp(); err != nil {
return err
Expand All @@ -61,14 +76,6 @@ func (runner *Runner) SetUp() error {
}
}
}()
// Teardown the db if in case setup happens only partially
defer func() {
if clients != nil {
if err := runner.db.TearDown(); err != nil {
log.Error("[%s] Error in Database.TearDown: %v", runner.name, err)
}
}
}()

// Retrieve connection config once to reuse for all clients
config := runner.db.ClientConfig()
Expand Down Expand Up @@ -97,6 +104,7 @@ func (runner *Runner) SetUp() error {
// Transfer ownership to runner struct; clearing local variable prevents defer from closing valid clients
runner.clients = clients
clients = nil
setupComplete = true
return nil
}

Expand Down Expand Up @@ -144,6 +152,7 @@ func (runner *Runner) Run() ([]gorgon.Operation, error) {
return operationList.Extract(), nil
}

// Database is shared across all workloads, so teardown happens once after all workloads complete
func (runner *Runner) TearDown() (retErr error) {
for _, gen := range runner.workload.Generators {
if err := gen.TearDown(); err != nil {
Expand All @@ -164,6 +173,10 @@ func (runner *Runner) TearDown() (retErr error) {
}
}
}
err := runner.db.TearDown()
if err != nil && retErr == nil {
retErr = err
}
return
}

Expand Down Expand Up @@ -209,6 +222,15 @@ func (runner *Runner) Check(history []gorgon.Operation, dir string) (err error)
level := log.INFO
// Save visualization for failed checks to help developers debug the violation
if result != porcupine.Ok {
if runner.options.ErrOnTestFail == "linearizability" {
if result == porcupine.Unknown { // partition check timed out
if err == nil { // prevents overwriting linearizabilityErr (linearizabilityErr > linearizabilityTimeoutErr)
err = linearizabilityTimeoutErr
}
} else {
err = linearizabilityErr
}
}
linearizable = false
level = log.WARNING
filePath := path.Join(dir, EscapeFileName(fmt.Sprintf(
Expand All @@ -221,7 +243,7 @@ func (runner *Runner) Check(history []gorgon.Operation, dir string) (err error)
log.Log(level, "[%s] Checked partition %d - %s", runner.name, i, result)
}

// Sequential consistency is a weaker guarantee; check when linearizability fails
// Sequential consistency is a weaker guarantee; verify it when linearizability fails
if !linearizable {
var hist [][]gorgon.Operation
for _, op := range history {
Expand All @@ -231,10 +253,16 @@ func (runner *Runner) Check(history []gorgon.Operation, dir string) (err error)
hist[op.ClientId] = append(hist[op.ClientId], op)
}

// Run the check for sequential consistency
result, info := CheckSeqnuentialConsistency(model, hist, time.Minute)
level := log.INFO
if result != checkers.Ok {
if err == nil && runner.options.ErrOnTestFail == "sequential" {
if result == checkers.Unknown {
err = sequentialTimeoutErr
} else {
err = sequentialErr
}
}
level = log.WARNING
}
filePath := path.Join(dir, EscapeFileName(fmt.Sprintf(
Expand Down
4 changes: 4 additions & 0 deletions src/gorgon/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ type Options struct {
ContinueAmbiguousClient bool
RpcPort int
RpcPassword string
LogDirectory string
Comment thread
chiragbytes7 marked this conversation as resolved.
CbcollectLogging bool
NetworkTraceCapture bool
ErrOnTestFail string
}

type Operation struct {
Expand Down
31 changes: 31 additions & 0 deletions src/gorgon/rpcs/cbcollect_rpc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package rpcs

import (
"os"
"os/exec"
"path/filepath"
"time"

"github.com/couchbaselabs/gorgon/src/gorgon/log"
)

type CbcollectRpc struct{}

// Unnamed receiver as the receiver is used to access fields of the struct, in this case none
// arg string to specify the location of the zip file
func (*CbcollectRpc) CbCollectLogs(arg *string, reply *string) error {
dir := *arg
if _, err := os.Stat(dir); os.IsNotExist(err) {
log.Info("The directory for cbcollect does not exist")
*arg = "/root/store/cbcollects_and_captures/"
log.Info("writing to the default location (%s)", *arg)
}
timestamp := time.Now().Format("2006-01-02-150405")
*arg = filepath.Join(dir, timestamp+"-cbcollect.zip")
err := exec.Command("/opt/couchbase/bin/cbcollect_info", *arg).Run()
Comment thread
chiragbytes7 marked this conversation as resolved.
if err != nil {
return err
}
*reply = "ok"
return nil
}
67 changes: 67 additions & 0 deletions src/gorgon/rpcs/networktrace_rpc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package rpcs

import (
"os"
"os/exec"
"path/filepath"
"strconv"
"time"

"github.com/couchbaselabs/gorgon/src/gorgon/log"
)

// Field in RPC struct to capture PID of initialized tshark
type NetworkCaptureRpc struct {
tsharkCmd *exec.Cmd
}

type NetworkCaptureConfig struct {
TsharkTimeout time.Duration
Directory string
}

// first arg for directory for captured network dump
func (rpc *NetworkCaptureRpc) StartCapture(arg *NetworkCaptureConfig, reply *string) error {
dir := arg.Directory
timeout := strconv.Itoa(int(arg.TsharkTimeout.Seconds()))
if _, err := os.Stat(dir); os.IsNotExist(err) {
log.Info("Provided directory doesn't exist, using default")
dir = "/root/store/cbcollects_and_captures"
}
timestamp := time.Now().Format("2006-01-02-150405")
outputFile := filepath.Join(dir, timestamp+"-capture.pcap")
rpc.tsharkCmd = exec.Command("tshark", "-i", "any", "-s", "0", "-a", "duration:"+timeout, "-w", outputFile)
err := rpc.tsharkCmd.Start()
if err != nil {
log.Error("Tshark start failed: %v", err)
return err
}
Comment thread
chiragbytes7 marked this conversation as resolved.
log.Info("Network Capture started with pid (%v)", rpc.tsharkCmd.Process.Pid)
*reply = "ok"
return nil
}

func (rpc *NetworkCaptureRpc) StopCapture(arg *string, reply *string) error {
if rpc.tsharkCmd != nil && rpc.tsharkCmd.Process != nil {
err := rpc.tsharkCmd.Process.Kill()
if err != nil {
log.Error("Failed to kill tshark: %v", err)
return err
}
log.Info("Successfully killed tshark with PID: %d", rpc.tsharkCmd.Process.Pid)

// Wait for the process to actually exit
_, err = rpc.tsharkCmd.Process.Wait()
if err != nil {
log.Warning("Process wait returned error: %v", err)
}

// Clear the cmd reference
rpc.tsharkCmd = nil
} else {
log.Info("No tshark process to stop")
}

*reply = "ok"
return nil
}
Loading